1.在QuorumPeerMain中main方法糙箍,main.initializeAndRun(args),啟動節(jié)點對象课舍。
protected void initializeAndRun(String[] args) throws ConfigException,IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task 啟動datadiar的定時清理任務(wù)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//如果配置中有多個zookeeper地址則調(diào)用runFromConfig,否則用 ZooKeeperServerMain.main(args); 單例模式啟動
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
2.initializeAndRun方法內(nèi)部是通過調(diào)用方法runFromConfig(QuorumPeerConfig config)將節(jié)點的配置載入他挎,并且啟動節(jié)點筝尾,調(diào)用其start(),并且join到主線程中變成循序執(zhí)行。
public void runFromConfig(QuorumPeerConfig config) throws IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.start();//啟動節(jié)點
quorumPeer.join();//join主線程办桨,順序運行
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
3.調(diào)用QuorumPeer類的start()方法啟動節(jié)點筹淫,start方法內(nèi)容如下:
@Override
public synchronized void start() {
loadDataBase(); //加載數(shù)據(jù)
cnxnFactory.start(); //啟動本地服務(wù)器 ,有netty和nio種模式
startLeaderElection(); //開始選舉leader
super.start(); // 因為QuorumPeer繼承于Thread呢撞,所以調(diào)用了QuorumPeer的run方法
}
//cnxnFactory的start方法使用netty方式時损姜,代碼如下饰剥,綁定localAddress啟動了一個channel
@Override
public void start() {
LOG.info("binding to port " + localAddress);
parentChannel = bootstrap.bind(localAddress);
}
startLeaderElection為選舉的方法,使用同步保證每次只有一個
synchronized public void startLeaderElection() {
try {
//先生成本地選票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//根據(jù)配置文件中quorumPeers節(jié)點摧阅,以及myid汰蓉,來設(shè)置myQuorumAddr,本地節(jié)點的地址
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
//生成配置選舉的算法
this.electionAlg = createElectionAlgorithm(electionType);
}
4.createElectionAlgorithm 生成選舉算法代碼如下:
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);//zookeeper 3.4以上@Deprecated
break;
case 1:
le = new AuthFastLeaderElection(this);//zookeeper 3.4以上@Deprecated
break;
case 2:
le = new AuthFastLeaderElection(this, true);//zookeeper 3.4以上@Deprecated
break;
case 3:
qcm = new QuorumCnxManager(this);//啟動本節(jié)點上下文管理器
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();//啟動SocketServer接收數(shù)據(jù)
le = new FastLeaderElection(this, qcm);//生成FLE選舉算法類
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
5.QuorumCnxManager對象主要維護針對各個節(jié)點的連接棒卷,內(nèi)部實現(xiàn)累Listener主要是啟動一個sockerServer負(fù)責(zé)接收顾孽,啟動SockerServer最多三次,RecvWorker接收到的消息比规,SendWorker主要是發(fā)送數(shù)據(jù)若厚。
6.FastLeaderElection 類 是選舉類。主要選舉的過程都在該類中實現(xiàn)蜒什。主要是調(diào)用該類中的starter方法,主要完成了一些初始化工作
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
到這里選舉需要的初始化工作基本完成测秸。
7.QuorumPeer類中然后調(diào)用super.start(),即調(diào)用本類的run方法灾常,開始真正的選舉過程乞封。
@Override
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
p = new RemotePeerBean(s);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
//以上注冊JMX
try {
/*
* Main loop
*/
//開始監(jiān)聽節(jié)點的狀態(tài),根據(jù)狀態(tài)調(diào)用相應(yīng)的方法
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
//節(jié)點狀態(tài)是否為只讀
if (Boolean.getBoolean("readonlymode.enabled")) {
//只讀模式暫時先不討論
.......省略只讀代碼
} else {
try {
setBCVote(null);
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
8.run中調(diào)用makeLEStrategy()返回FLE對象岗憋,調(diào)用其中l(wèi)ookForLeader(),如下
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
//以上是注冊JMX代碼
if (self.start_fle == 0) {
self.start_fle = System.currentTimeMillis();//開始選舉的時間
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();//收到的票
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();//投出的票
int notTimeout = finalizeWait;
//同步處理肃晚,logicalclock加1,生成本地的Proposal
synchronized(this){
logicalclock++;// 當(dāng)前邏輯時鐘數(shù)加1
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());//本peer的:sid,lastProcessedZxid仔戈,以及currentEpoch关串,在投票時使用getVote的時候會用到,投票主要也是這3個參數(shù)
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();//第一次投自己一票监徘,通知各個節(jié)點晋修。
/*
* Loop in which we exchange notifications until we find a leader
*/
//如果一直是LOOKING狀態(tài)即選舉的狀態(tài),則循環(huán)
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);//從各個節(jié)點獲取過來的信息凰盔。
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
//如果沒有回音墓卦,同時消息已經(jīng)發(fā)送,則重新通知各個節(jié)點户敬,如果還有消息沒發(fā)送完落剪,則重新連接下。
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if(self.getVotingView().containsKey(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
//收到別的服務(wù)器的選票
switch (n.state) {
case LOOKING:
// If notification > current, replace and send messages out
//如果收到的選舉輪數(shù)>本地的輪數(shù)(n.electionEpoch > logicalclock)尿庐,將選舉輪數(shù)logicalclock更改為收到的輪數(shù)忠怖,如果收到的選票中的electionEpoch > logicalclock 則更新本地的proposal,最后再次發(fā)起通知各個peer
if (n.electionEpoch > logicalclock) {
logicalclock = n.electionEpoch;
recvset.clear();//清除所有已經(jīng)收到的選票
//如果收到的proposal比本地的proposal厲害抄瑟,則更新本地的proposal凡泣,然后則
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock) {//如果選舉輪數(shù)<本地輪數(shù),則不做處理
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {//如過輪數(shù)相等則比較proposal,如果收到的proposal比較新鞋拟,則更新本地proposal骂维,再次發(fā)起通知各個peer
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
//記錄每次收到的投票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//如果當(dāng)前的proposal在當(dāng)前的選舉輪數(shù)已經(jīng)能夠選舉出leader了,termPredicate函數(shù)返回true贺纲。
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
// Verify if there is any change in the proposed leader
//如果收到proposal有變化,則放入recvqueue航闺,循環(huán)到while頭進行處理
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//在已經(jīng)選舉出LEADER的情況下,如果一直沒有消息哮笆,則認(rèn)為選舉結(jié)束来颤,新的LEADER產(chǎn)生。設(shè)置本PEER的狀態(tài)稠肘,通過sid來判斷是否自己為leader福铅,如果為leader則更改狀態(tài)為LEADING,然后,運行LEADING的case项阴,同時設(shè)置endVote滑黔,
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
}
}
9.選舉出LEADER之后,分二個环揽,首先如果是LEADER的話略荡,則peer的STATE為LEADING,如果非LEADER,則根據(jù)LearnType的類型分為FOLLOWING和OBSERVING
在QuorumPeer中,狀態(tài)變更關(guān)鍵代碼如下:
{
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
首先我們關(guān)注LEADING狀態(tài)的事件歉胶,setLeader方法是初始化了一個Leader對象汛兜,關(guān)鍵在leader.lead()方法。代碼如下
void lead() throws IOException, InterruptedException {
self.end_fle = System.currentTimeMillis();
LOG.info("LEADING - LEADER ELECTION TOOK - " +
(self.end_fle - self.start_fle));
self.start_fle = 0;
self.end_fle = 0;
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
//注冊leader的jmx
try {
self.tick = 0;
zk.loadData();//載入數(shù)據(jù)通今,初始化Zxid粥谬,并且cleanup已經(jīng)dead的session
//leader的上下文信息
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
//啟動socketserver,這個SocketServer的啟動在setLeader方法中已經(jīng)完成辫塌,接收follower的連接
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
readyToStart = true;
//leader將自己的acceptedEpoch進行計算漏策,計算通過以后,判斷是否一半以上的follower已經(jīng)上傳了acceptedEpoch臼氨,如果還沒到一般掺喻,leader會進入wait狀態(tài)。
//同時follower通過learn對象中registerWithLeader方法储矩,將follower的AcceptedEpoch發(fā)送給leader感耙。
//在learnHandler里面做處理獲取follower通過zxid高32為作為follower的epoch。
//使用epoch+1作為新的epoch
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
//確定epoch以后椰苟,向左移32抑月,作為最新的zxid
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
//設(shè)置lastProposed
synchronized(this){
lastProposed = zk.getZxid();
}
//生成NEWLEADER的packet,內(nèi)容為最新的zxid舆蝴,發(fā)送給follower
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}
//發(fā)送給follower最新的zxid以后,等待follower的epochAck確認(rèn),這里線程會停止wait洁仗,具體看learn和learnhandler之間的交互层皱,在learnhandler中,根據(jù)follower的epochack赠潦,會有一個notifyAll()方法叫胖。也是判斷一半的follower已經(jīng)epochAck了。處理每個follower的learnHandler都會調(diào)用leader.waitForEpochAck()她奥;進入等待瓮增,直到其中一個learnhandler判斷發(fā)現(xiàn)過半,通過notifyAll()喚醒所有哩俭。
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged
try {
//在發(fā)送NewLeader事件后绷跑,等待follower的NewLeaderAck事件,也需要過半凡资。和waitEpochAck類似
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ getSidSetString(newLeaderProposal.ackSet) + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for (LearnerHandler f : learners)
followerSet.add(f.getSid());
if (self.getQuorumVerifier().containsQuorum(followerSet)) {
LOG.warn("Enough followers present. "
+ "Perhaps the initTicks need to be increased.");
}
Thread.sleep(self.tickTime);
self.tick++;
return;
}
//啟動
startZkServer();
/**
* WARNING: do not use this for anything other than QA testing
* on a real cluster. Specifically to enable verification that quorum
* can handle the lower 32bit roll-over issue identified in
* ZOOKEEPER-1277. Without this option it would take a very long
* time (on order of a month say) to see the 4 billion writes
* necessary to cause the roll-over to occur.
*
* This field allows you to override the zxid of the server. Typically
* you'll want to set it to something like 0xfffffff0 and then
* start the quorum, run some operations and see the re-election.
*/
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}
// Everything is a go, simply start counting the ticks
// WARNING: I couldn't find any wait statement on a synchronized
// block that would be notified by this notifyAll() call, so
// I commented it out
//synchronized (this) {
// notifyAll();
//}
// We ping twice a tick, so we only update the tick every other
// iteration
boolean tickSkip = true;
while (true) {
Thread.sleep(self.tickTime / 2);
if (!tickSkip) {
self.tick++;
}
HashSet<Long> syncedSet = new HashSet<Long>();
// lock on the followers when we use it.
syncedSet.add(self.getId());
for (LearnerHandler f : getLearners()) {
// Synced set is used to check we have a supporting quorum, so only
// PARTICIPANT, not OBSERVER, learners should be used
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
f.ping();
}
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
// Lost quorum, shutdown
shutdown("Not sufficient followers synced, only synced with sids: [ "
+ getSidSetString(syncedSet) + " ]");
// make sure the order is the same!
// the leader goes to looking
return;
}
tickSkip = !tickSkip;
}
} finally {
zk.unregisterJMX(this);
}
}
10.看下learn和learnhandler的相互通信砸捏。learnHandler
public void run() {
try {
tickOfNextAckDeadline = leader.self.tick
+ leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.version = li.getProtocolVersion();
}
} else {
this.sid = leader.followerCounter.getAndDecrement();//增加follower數(shù)量
}
LOG.info("Follower sid: " + sid + " : info : "
+ leader.self.quorumPeers.get(sid));
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());//從leader初獲取zxid
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
//判斷l(xiāng)astAcceptedEpoch和本地lastAcceptedEpoch比較,同時判斷時候一半的follower已經(jīng)提交隙赁,然后會進入等待垦藏。用的是排它鎖connectingFollowers對象
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
//發(fā)送Leader.LEADERINFO時間,將新epoch發(fā)送給follower伞访。等待follower的epochAck事件掂骏,代碼在learn中
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
//開始同步數(shù)據(jù)。
/* the default to send to the follower */
int packetToSend = Leader.SNAP;
long zxidToSend = 0;
long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;
/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
//獲取leader中zkDatabase的讀寫鎖厚掷。
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();//獲取讀共享鎖弟灼,同步數(shù)據(jù)的時候可以讀,不能寫
try {
rl.lock();
//獲取zkdatabase的已經(jīng)提交的日志范圍
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();//zk提交最大的zxid
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();//zk提交最小的zxid
LOG.info("Synchronizing with Follower sid: " + sid
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
//獲取zkdatabase中已經(jīng)committed的proposal
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
if (proposals.size() != 0) {
LOG.debug("proposal size is {}", proposals.size());
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
LOG.debug("Sending proposals to follower");
// as we look through proposals, this variable keeps track of previous
// proposal Id.
long prevProposalZxid = minCommittedLog;
// Keep track of whether we are about to send the first packet.
// Before sending the first packet, we have to tell the learner
// whether to expect a trunc or a diff
boolean firstPacket=true;
// If we are here, we can use committedLog to sync with
// follower. Then we only need to decide whether to
// send trunc or not
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;
/**
* a) 如果lastzxid在min和max之間
* 循環(huán) proposals 蝗肪,
* (1)當(dāng)單個 proposal的zxid <= 當(dāng)前的peerLastZxid時袜爪,說明已經(jīng)提交過了,因此直接跳過
* (2)當(dāng) proposal的zxid 大于 peerLastZxid時薛闪,則刪除小于peerLastZxid部分辛馆,因為已經(jīng)提交過了,剩余部分繼續(xù)做COMMIT操作豁延。因此在所有commit之前昙篙,先發(fā)送一個trunc事件,刪除已經(jīng)提交過的部分诱咏,然后發(fā)送需要的commit的相關(guān)節(jié)點
* b) 如果當(dāng)前的peerLastZxid 大于 max苔可,則全部做TRUNC。
* c) 剩下的不處理 ,可能是新加入的節(jié)點袋狞,所以事件類型為SNAP,同步數(shù)據(jù)時直接取快照焚辅。
*
**/
for (Proposal propose: proposals) {
// skip the proposals the peer already has
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else {
// If we are sending the first packet, figure out whether to trunc
// in case the follower has some proposals that the leader doesn't
if (firstPacket) {
firstPacket = false;
// Does the peer have some proposals that the leader hasn't seen yet
if (prevProposalZxid < peerLastZxid) {
// send a trunc message before sending the diff
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
}
}
} else if (peerLastZxid > maxCommittedLog) {
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),
Long.toHexString(updates));
packetToSend = Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
LOG.warn("Unhandled proposal scenario");
}
} else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
// The leader may recently take a snapshot, so the committedLog
// is empty. We don't need to send snapshot if the follow
// is already sync with in-memory db.
LOG.debug("committedLog is empty but leader and follower "
+ "are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else {
// just let the state transfer happen
LOG.debug("proposals is empty");
}
LOG.info("Sending " + Leader.getPacketType(packetToSend));
//將leader中沒有commit的數(shù)據(jù)commit掉
leaderLastZxid = leader.startForwarding(this, updates);
} finally {
rl.unlock();
}
//發(fā)送NEWLEADER事件映屋,參數(shù)是newepoch
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
ZxidUtils.makeZxid(newEpoch, 0), null, null);
if (getVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
//Need to set the zxidToSend to the latest zxid
//如果是獲取快照,則發(fā)送當(dāng)前最新的zxid過去
if (packetToSend == Leader.SNAP) {
zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
}
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
//發(fā)送快照給follower
if (packetToSend == Leader.SNAP) {
LOG.info("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
+ " zxid of leader is 0x"
+ Long.toHexString(leaderLastZxid)
+ "sent zxid of db as 0x"
+ Long.toHexString(zxidToSend));
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
// Start sending packets
//開始發(fā)送隊列中的packet
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();
/*
* Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
* start processing messages.
*/
//等待follower的ACK反饋信息,同時等待同蜻,當(dāng)所有超過半數(shù)的follower返回LEADER.ACK成功
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK");
return;
}
LOG.info("Received NEWLEADER-ACK message from " + getSid());
leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
syncLimitCheck.start();
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//發(fā)送UPDATE事件給follower通知可以開始使用數(shù)據(jù)棚点。leader開始監(jiān)聽發(fā)送過來的事件。更新數(shù)據(jù)完畢湾蔓。選舉完成瘫析。
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
ByteBuffer bb;
long sessionId;
int cxid;
int type;
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
leader.zk.touch(sess, to);
}
break;
case Leader.REVALIDATE:
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
boolean valid = leader.zk.touch(id, to);
if (valid) {
try {
//set the session owner
// as the follower that
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: "+ valid);
}
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
queuedPackets.add(qp);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitRequest(si);
break;
default:
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock "
+ "still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch(IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
} finally {
LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
+ " ********");
shutdown();
}
}
隨便寫了點,給自己留點記錄默责,第一次寫沒經(jīng)驗贬循,有什么不對的地方也請大家指出