說在前面
在 zookeeper ZAB Leader Elect 源碼分析 尽超,已經(jīng)詳細(xì)的分析了zookeeper的選主過程磅叛,接下來的文章會(huì)分析Leader和Follower的初始化過程
初始化示例圖
當(dāng)leader被選出來之后嗅定,leader和follower進(jìn)入集群形成和數(shù)據(jù)同步狀態(tài)赂弓,包含以下幾個(gè)過程
1. 連接連接
leader會(huì)根據(jù)zoo.cfg里面配置ip的第一個(gè)端口啟動(dòng)連接監(jiān)聽器LearnerCnxAcceptorHandler來監(jiān)控follower的連接請(qǐng)求
新epoch的確定
新形成的集群需要一個(gè)新的epoch來表示大家目前是不是工作共一個(gè)circle中
數(shù)同步過程
當(dāng)集群新的epoch確定之后到腥,集群就開始進(jìn)行數(shù)據(jù)恢復(fù)梧田,數(shù)據(jù)恢復(fù)完成之后雁比,follower和leader的數(shù)據(jù)處理引擎啟動(dòng)稚虎,之后集群就可以向外提供服務(wù)了
下面進(jìn)行源碼的詳解
Leader
經(jīng)過幾輪投票之后,Leader被成功的選了出來偎捎,Leading 所對(duì)應(yīng)的QuorumPeer類就會(huì)進(jìn)入Leading過程
while (running) {
switch (getPeerState()) {
case LOOKING: .....
case FOLLOWING: .....
case OBSERVING: ......
case LEADING:
LOG.info("LEADING");
try {
//生成Leader對(duì)象
setLeader(makeLeader(logFactory));
//作為集群老大開始領(lǐng)導(dǎo)大家
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
}
QuorumPeer 創(chuàng)建Leader對(duì)象
Leader對(duì)象繼承自LearnerMaster蠢终,Leader的屬性太多,我們就不在這里介紹了茴她,在后面如果遇到對(duì)應(yīng)的屬性我們?cè)僮鲈敿?xì)解析
new Leader
// LeaderZooKeeperServer 繼承ZookeeperServer寻拂,它代表的是節(jié)點(diǎn)在Leader角色下的zookeeper服務(wù)
public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();
//獲得Leader節(jié)點(diǎn)的監(jiān)聽ip地址端口
Set<InetSocketAddress> addresses;
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getQuorumAddress().getWildcardAddresses();
} else {
addresses = self.getQuorumAddress().getAllAddresses();
}
addresses.stream()
.map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum()))
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(serverSockets::add);
if (serverSockets.isEmpty()) {
throw new IOException("Leader failed to initialize any of the following sockets: " + addresses);
}
this.zk = zk;
}
new LeaderZooKeeperServer
LeaderZooKeeperServer的初始化過程最后會(huì)初始化ZookeeperServer,下面的代碼我在zookeeper單機(jī)版server端啟動(dòng)源碼分析中有解析過丈牢,這里不在重復(fù)
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
this.zkDb = zkDb;
this.tickTime = tickTime;
setMinSessionTimeout(minSessionTimeout);
setMaxSessionTimeout(maxSessionTimeout);
this.listenBacklog = clientPortListenBacklog;
this.reconfigEnabled = reconfigEnabled;
listener = new ZooKeeperServerListenerImpl(this);
readResponseCache = new ResponseCache(Integer.getInteger(
GET_DATA_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));
getChildrenResponseCache = new ResponseCache(Integer.getInteger(
GET_CHILDREN_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));
this.initialConfig = initialConfig;
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
this.initLargeRequestThrottlingSettings();
LOG.info(
"Created server with"
+ " tickTime {}"
+ " minSessionTimeout {}"
+ " maxSessionTimeout {}"
+ " clientPortListenBacklog {}"
+ " datadir {}"
+ " snapdir {}",
tickTime,
getMinSessionTimeout(),
getMaxSessionTimeout(),
getClientPortListenBacklog(),
txnLogFactory.getDataDir(),
txnLogFactory.getSnapDir());
}
后面我們解析Leader數(shù)據(jù)處理鏈的時(shí)候還會(huì)解析祭钉,LeaderZooKeeperServer現(xiàn)在我們暫且不表。
Leader.lead
Leader創(chuàng)建完之后通過lead方法讓自己進(jìn)入Lead的過程己沛,我們看下lead()方法的源代碼,這個(gè)方法比較長(zhǎng)我們一段一段的來看慌核,先看第一段
//設(shè)置zab的狀態(tài)為discovery
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);
//zookeeperServer去加載本地?cái)?shù)據(jù),正常的情況下由于在選主的時(shí)候zk本地的數(shù)據(jù)已經(jīng)加載完成了申尼,這里的loadData只是會(huì)做一個(gè)本地鏡像
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
//創(chuàng)建leader端的連接監(jiān)聽器線程管理類垮卓,用來創(chuàng)建等待follower的連接的LearnerCnxAcceptorHandler連接監(jiān)聽器
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
//lead方法會(huì)在這個(gè)地方wait,等待有過半數(shù)follower到來师幕,然后生成新的epoch
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
LearnerCnxAcceptor
Leader在lead過程中會(huì)創(chuàng)建LearnerCnxAcceptor我們看下LearnerCnxAcceptor的run實(shí)現(xiàn)
public void run() {
if (!stop.get() && !serverSockets.isEmpty()) {
ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());
CountDownLatch latch = new CountDownLatch(serverSockets.size());
//根據(jù)leader綁定的ip來創(chuàng)建LearnerCnxAcceptorHandler
serverSockets.forEach(serverSocket ->
executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));
try {
latch.await();
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping in LearnerCnxAcceptor.", ie);
} finally {
closeSockets();
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.error("not all the LearnerCnxAcceptorHandler terminated properly");
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while terminating LearnerCnxAcceptor.", ie);
}
}
}
}
LearnerCnxAcceptorHandler
Leader端用來接收follower連接請(qǐng)求的線程
我看下LearnerCnxAcceptorHandler的run方法
public void run() {
try {
Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress());
while (!stop.get()) {
//acceptConnections實(shí)現(xiàn)了Leader ServerSocket監(jiān)聽follower的連接請(qǐng)求
acceptConnections();
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
if (fail.compareAndSet(false, true)) {
handleException(getName(), e);
halt();
}
} finally {
latch.countDown();
}
}
LearnerCnxAcceptorHandler.acceptConnections
private void acceptConnections() throws IOException {
Socket socket = null;
boolean error = false;
try {
//接收follower的連接請(qǐng)求
socket = serverSocket.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
//設(shè)置socket的超時(shí)時(shí)間
socket.setSoTimeout(self.tickTime * self.initLimit);
socket.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
//創(chuàng)建LearnerHandler線程來表示和處理follower發(fā)送來的請(qǐng)求
LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop.get()) {
LOG.warn("Exception while shutting down acceptor.", e);
} else {
throw e;
}
} catch (SaslException e) {
LOG.error("Exception while connecting to quorum learner", e);
error = true;
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && socket != null && !socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
LOG.warn("Error closing socket: " + socket, e);
}
}
}
}
LearnerHandler 是follower在Leader端的表示
我們暫時(shí)先不做詳細(xì)介紹粟按,等下面講Follower的時(shí)候我們?cè)俳馕?/p>
Follower
上面講解了master初始化的一部分,為什么不繼續(xù)講解呢霹粥,因?yàn)檫@個(gè)時(shí)候只有結(jié)合follower端的動(dòng)作才能更好的理解灭将,下面我們進(jìn)入follower初始化過程的解析。
在master被選出來之后非master節(jié)點(diǎn)會(huì)把自己設(shè)置成follower節(jié)點(diǎn)(這里我們不講observer)然后進(jìn)入followLeader階段,下面是follower節(jié)點(diǎn)QuorumPeer following代碼片段
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
創(chuàng)建Follower
創(chuàng)建follower首先會(huì)創(chuàng)建FollowerZooKeeperServer后控,F(xiàn)ollowerZooKeeperServer繼承自ZooKeeperServer宗侦,代表了follower節(jié)點(diǎn)的zookeeper實(shí)例,但是它有自己獨(dú)特的請(qǐng)求處理鏈在后續(xù)講解請(qǐng)求處理的時(shí)候忆蚀,我會(huì)詳細(xì)解析
new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
follower.followLeader
Follower實(shí)例被創(chuàng)建起來之后通followLeader來與Leader進(jìn)行交互
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
long connectionTime = 0;
boolean completedSync = false;
try {
//把zab的狀態(tài)由election改變成discovery
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
//根據(jù)投票的結(jié)果找到Leader節(jié)點(diǎn)
QuorumServer leaderServer = findLeader();
try {
//建立到Leader節(jié)點(diǎn)的socket連接
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime = System.currentTimeMillis();
//向Leader節(jié)點(diǎn)注冊(cè)自己矾利,獲取到新的epoch姑裂,至此,follower已經(jīng)成為zookeeper集群中一個(gè)合法的follower節(jié)點(diǎn)了
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
}
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch "
+ ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch "
+ ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
long startTime = Time.currentElapsedTime();
try {
//follower設(shè)置leader的adress和sid
self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
// follower更改zab狀態(tài)為synchronization,進(jìn)入數(shù)據(jù)同步狀態(tài)
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
//與leader同步數(shù)據(jù)
syncWithLeader(newEpochZxid);
//同步完數(shù)據(jù)之后男旗,follower修改zab狀態(tài)為broadcast舶斧,zab協(xié)議的最后一個(gè)過程完成,節(jié)點(diǎn)可以向外提供服務(wù)了
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
} finally {
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
}
if (self.getObserverMasterPort() > 0) {
LOG.info("Starting ObserverMaster");
om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
om.start();
} else {
om = null;
}
// create a reusable packet to reduce gc impact todo: at this point follower is ready to serve
QuorumPacket qp = new QuorumPacket();
//follower接下來會(huì)進(jìn)入如下的循環(huán)體中察皇,接受來自leader的消息然后處理
while (this.isRunning()) {
//接受來自leader的消息
readPacket(qp);
//處理消息:關(guān)于消息的處理茴厉,我們放在后面解析
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
closeSocket();
// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
if (om != null) {
om.stop();
}
zk.unregisterJMX(this);
if (connectionTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectionTime;
LOG.info(
"Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
leaderAddr,
connectionDuration,
completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}
connectToLeader
Follower連接Leader,我們直接看實(shí)現(xiàn)
protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
this.leaderAddr = multiAddr;
Set<InetSocketAddress> addresses;
if (self.isMultiAddressReachabilityCheckEnabled()) {
// even if none of the addresses are reachable, we want to try to establish connection
// see ZOOKEEPER-3758
addresses = multiAddr.getAllReachableAddressesOrAll();
} else {
addresses = multiAddr.getAllAddresses();
}
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
CountDownLatch latch = new CountDownLatch(addresses.size());
AtomicReference<Socket> socket = new AtomicReference<>(null);
//根據(jù)leader的地址創(chuàng)建LeaderConnector什荣,LeaderConnector是真正建立到Leader連接的線程類
addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
try {
latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while trying to connect to Leader", e);
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.error("not all the LeaderConnector terminated properly");
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
}
}
if (socket.get() == null) {
throw new IOException("Failed connect to " + multiAddr);
} else {
sock = socket.get();
}
self.authLearner.authenticate(sock, hostname);
//通過LeaderConnector我們創(chuàng)建了到Leader的socket連接矾缓,
// leaderIs封裝了到leader socket的輸入流
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
// leaderOs封裝了到leader socket的輸出流
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}
LeaderConnector
LeaderConnector的作用就是follower建立到Leader的socket連接渐扮,它是一個(gè)線程類囱淋,當(dāng)?shù)降絃eader的socket建立完成之后這個(gè)類的使命也就完成了,其線程也就結(jié)束了,這個(gè)類的實(shí)現(xiàn)沒有什么好說了的典勇,在這里就不解析了桅锄。
Follower.registerWithLeader
Follower向Leader注冊(cè)自己的信息
protected long registerWithLeader(int pktType) throws IOException {
/*
* Send follower info, including last zxid and sid
*/
//follower從db中獲取已經(jīng)處理最大的事物id
long lastLoggedZxid = self.getLastLoggedZxid();
//QuorumPacket是Leader的Follower之間消息序列化載體
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
/*
* Add sid to payload
*/
//LearningInfo表示的是本follower
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
//把follower的信息(sid琉雳,協(xié)議版本號(hào),verify的版本號(hào))序列化到ByteArrayOutputStream中友瘤,然后寫入QuorumPacket的data中
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
//把QuorumPacket通過socket發(fā)送給服務(wù)端
writePacket(qp, true);
//等待服務(wù)端的返回,leader會(huì)返回新的epoch給到客戶端
readPacket(qp);
//從leader的返回中獲得新的epoch
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte[] epochBytes = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
if (newEpoch > self.getAcceptedEpoch()) {
//把新的epoch寫入到本機(jī)的acceptedEpoch文件中翠肘,表示follower在當(dāng)前新circle中了
wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
//如果新的epoch和acceptedEpoch相同,那么做不任何處理
// since we have already acked an epoch equal to the leaders, we cannot ack
// again, but we still need to send our lastZxid to the leader so that we can
// sync with it if it does assume leadership of the epoch.
// the -1 indicates that this reply should not count as an ack for the new epoch
wrappedEpochBytes.putInt(-1);
} else {
//如果follower的acceptedEpoch大于新的epoch那么表示本機(jī)所在circle大于新集群設(shè)定的circle辫秧,直接報(bào)錯(cuò)
throw new IOException("Leaders epoch, "
+ newEpoch
+ " is less than accepted epoch, "
+ self.getAcceptedEpoch());
}
//向leader發(fā)送對(duì)新的epoch的ack(包括自己的事物id束倍,自己當(dāng)前的epoch)
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
return qp.getZxid();
}
}
到這里我停一下,follower既然向leader發(fā)送了消息盟戏,我們看下leader端是如何處理的
上面我提到服務(wù)端創(chuàng)建線程類LearnerHandler來處理follower的請(qǐng)求绪妹,我們現(xiàn)在看下LearnerHandler的run方法
LearnerHandler.run
LearnerHandler處理和follower之間的所有通信數(shù)據(jù),代碼很長(zhǎng)抓半,下面我分段進(jìn)行講解喂急,我先看第一段格嘁,接受follower發(fā)送來的epoch
public void run() {
try {
//把代表follower的LearnerHandler加入到learnerMaster的follower列表中
learnerMaster.addLearnerHandler(this);
tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();
//初始化輸入流和輸出流
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
//通過輸入流讀取來自follower發(fā)送的來的一個(gè)epoch信息
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());
return;
}
if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
}
//獲取消息體
byte[] learnerInfoData = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
//從消息體中獲得follower的sid
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
//獲取protocolVersion
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
//獲取configVersion
long configVersion = bbsid.getLong();
if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = learnerMaster.getAndDecrementFollowerCounter();
}
//根據(jù)sid獲取follower配置的ip和端口信息
String followerInfo = learnerMaster.getPeerInfo(this.sid);
if (followerInfo.isEmpty()) {
LOG.info(
"Follower sid: {} not in the current config {}",
this.sid,
Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
} else {
LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
}
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
learnerMaster.registerLearnerHandlerBean(this, sock);
//getEpochFromZxid 獲取客戶端發(fā)送的epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
// learnerMaster.getEpochToPropose:leader QuorumPeer lead在getEpochToPropose方法上會(huì)等待過半數(shù)的以上的participant到來笛求,才會(huì)進(jìn)行執(zhí)行l(wèi)ead剩余的代碼
//每一個(gè)follower到來,只要他們?cè)谕粋€(gè)輪次糕簿,這個(gè)follower就會(huì)更改leader中participant個(gè)數(shù)的狀態(tài)探入,如果follower加上leader的數(shù)量過半了,那么leader的QuorumPeer線程就會(huì)跳出等待懂诗,繼續(xù)執(zhí)行蜂嗽,同時(shí)返回新的epoch,表示集群進(jìn)入新的工作輪次
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
第二段 Leader發(fā)送新生成的epoch給到follower
//根據(jù)新生成的epoch生成leader新zxid
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
//創(chuàng)建消息發(fā)送體 發(fā)送新的leader zxid給follower
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
//通過把socket把消息發(fā)送出去
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
//讀取follower對(duì)leader新生成的epoch的ack
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
return;
}
這個(gè)時(shí)候我再回到Leader的lead方法殃恒,當(dāng)主線程從getEpochToPropose返回后發(fā)生了什么
//獲得新生成的epoch
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
//epoch生成zxid
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized (this) {
lastProposed = zk.getZxid();
}
//生成宣稱自己是leader的報(bào)文
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
}
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
try {
LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
//等待過半數(shù)的participant對(duì)新epoch的ack信息
waitForEpochAck(self.getId(), leaderStateSummary);
這個(gè)時(shí)候leader在等待有過半數(shù)的participant對(duì)新epoch的ack
我們?cè)诨氐絃earnerHandler看下當(dāng)讀取到follower發(fā)送來的epoch的ack后發(fā)生了什么
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
//當(dāng)收到follower發(fā)送來的對(duì)新epoch的ack后調(diào)用leader的waitForEpochAck方法
learnerMaster.waitForEpochAck(this.getSid(), ss);
waitForEpochAck
我們分析下waitForEpochAck植旧,其實(shí)它的代碼比較簡(jiǎn)單就是判斷有沒沒有過半數(shù)的participant對(duì)新epoch進(jìn)行ack辱揭,如果有,那么這個(gè)這個(gè)新epoch就在整個(gè)集群中生效了病附,成為合法的大家公認(rèn)的在本輪次使用的epoch
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
// electingFollowers 保存了已經(jīng)對(duì)新epoch進(jìn)行ack的participant數(shù)量
synchronized (electingFollowers) {
if (electionFinished) {
return;
}
if (ss.getCurrentEpoch() != -1) {
if (ss.isMoreRecentThan(leaderStateSummary)) {
throw new IOException("Follower is ahead of the leader, leader summary: "
+ leaderStateSummary.getCurrentEpoch()
+ " (current epoch), "
+ leaderStateSummary.getLastZxid()
+ " (last zxid)");
}
if (ss.getLastZxid() != -1 && isParticipant(id)) {
electingFollowers.add(id);
}
}
QuorumVerifier verifier = self.getQuorumVerifier();
//如果有過半數(shù)的participant對(duì)new epoch進(jìn)行了ack问窃,那么本輪的選舉正式完成
if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
electionFinished = true;
electingFollowers.notifyAll();
} else {
//如果還沒有過半數(shù)的participant對(duì)new epoch 進(jìn)行ack那么線程進(jìn)入wait等待
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit() * self.getTickTime();
while (!electionFinished && cur < end) {
electingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!electionFinished) {
throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
}
}
}
}
我們?cè)倩氐絝ollower端,當(dāng)follower發(fā)送完對(duì)new epoch的ack后就會(huì)進(jìn)入zab synchronization階段完沪,我看下 follower端syncWithLeader的實(shí)現(xiàn)域庇,同樣這個(gè)方法比較長(zhǎng)我們分段分析
protected void syncWithLeader(long newLeaderZxid) throws Exception {
//預(yù)生產(chǎn)對(duì)leader的LEADERINFO的ack報(bào)文
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
//讀取來自leader的數(shù)據(jù)同步指令
readPacket(qp);
這個(gè)時(shí)候我們?cè)倩氐絃earnerHandler,當(dāng)new epoch被過半數(shù)的participant接受后覆积,LearnerHandler進(jìn)入syncFollower
LearnerHandler syncFollower
syncFollower方法根據(jù)follower的f_zxid和leader自己已經(jīng)處理的zxid 來決定如何恢復(fù)follower的數(shù)據(jù)
boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
/*
* When leader election is completed, the leader will set its
* lastProcessedZxid to be (epoch < 32). There will be no txn associated
* with this zxid.
*
* The learner will set its lastProcessedZxid to the same value if
* it get DIFF or SNAP from the learnerMaster. If the same learner come
* back to sync with learnerMaster using this zxid, we will never find this
* zxid in our history. In this case, we will ignore TRUNC logic and
* always send DIFF if we have old enough history
*/
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
// Keep track of the latest zxid which already queued
long currentZxid = peerLastZxid;
boolean needSnap = true;
//獲取leader的zk數(shù)據(jù)庫
ZKDatabase db = learnerMaster.getZKDatabase();
boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
ReentrantReadWriteLock lock = db.getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
//leader目前已經(jīng)處理的最大事物id
long maxCommittedLog = db.getmaxCommittedLog();
//leader目前已經(jīng)處理的同時(shí)還在事物隊(duì)列中的最小事物id
long minCommittedLog = db.getminCommittedLog();
//leader目前已經(jīng)處理的且被ack的最新事物id
//當(dāng)maxCommittedLog>lastProcessedZxid時(shí)候
//說當(dāng)前l(fā)eader 有一些事物在log中還沒有被集群其他機(jī)器ack
long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}"
+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ " peerLastZxid=0x{}",
getSid(),
Long.toHexString(maxCommittedLog),
Long.toHexString(minCommittedLog),
Long.toHexString(lastProcessedZxid),
Long.toHexString(peerLastZxid));
if (db.getCommittedLog().isEmpty()) {
/*
* It is possible that committedLog is empty. In that case
* setting these value to the latest txn in learnerMaster db
* will reduce the case that we need to handle
*
* Here is how each case handle by the if block below
* 1. lastProcessZxid == peerZxid -> Handle by (2)
* 2. lastProcessZxid < peerZxid -> Handle by (3)
* 3. lastProcessZxid > peerZxid -> Handle by (5)
*/
minCommittedLog = lastProcessedZxid;
maxCommittedLog = lastProcessedZxid;
}
/*
* 下面的英文注釋給出了syncFollower要處理的幾種數(shù)據(jù)同步方案
* Here are the cases that we want to handle
*
* 1. Force sending snapshot (for testing purpose)
//如果follower端的事物id已經(jīng)和master 事物id相同听皿,那么直接給follower發(fā)送一個(gè)空的diff狀態(tài)
* 2. Peer and learnerMaster is already sync, send empty diff
//如果follower的zxid大于master的zxid,那么發(fā)送trunc消息宽档,指示follower刪除對(duì)應(yīng)zxid所關(guān)聯(lián)的事物尉姨,
//但是如果follower發(fā)送來的zxid為newEpochZxid說明這個(gè)follower本身還沒有處理過任何事物。那么不能發(fā)送trunc
* 3. Follower has txn that we haven't seen. This may be old leader
* so we need to send TRUNC. However, if peer has newEpochZxid,
* we cannot send TRUNC since the follower has no txnlog
//如果follower的zxid在leader的committedLog 范圍之內(nèi)雌贱,我們需要發(fā)送diff
* 4. Follower is within committedLog range or already in-sync.
* We may need to send DIFF or TRUNC depending on follower's zxid
* We always send empty DIFF if follower is already in-sync
//如果follower的zxid小于minCommittedLog那么需要使用leader的commitLog和磁盤上的事物信息來恢復(fù)follower端的數(shù)據(jù)啊送,
//如果失敗,直接使用snap的方式同步數(shù)據(jù)
* 5. Follower missed the committedLog. We will try to use on-disk
* txnlog + committedLog to sync with follower. If that fail,
* we will send snapshot
*/
//下面分析下上面提到的5條follower數(shù)據(jù)同步準(zhǔn)則
if (forceSnapSync) {
// Force learnerMaster to use snapshot to sync with follower
LOG.warn("Forcing snapshot sync - should not see this in production");
} else if (lastProcessedZxid == peerLastZxid) {
// Follower is already sync with us, send empty diff
//follower端的zxid和leader的zxid相同欣孤,那么直接給follower發(fā)送一個(gè)diff指令馋没,后面不會(huì)發(fā)送任何數(shù)據(jù)
LOG.info(
"Sending DIFF zxid=0x{} for peer sid: {}",
Long.toHexString(peerLastZxid),
getSid());
queueOpPacket(Leader.DIFF, peerLastZxid);
needOpPacket = false;
needSnap = false;
} else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
//如果follower的zxid大于leader已經(jīng)提交的最大的zxid,那么向follower端發(fā)送trunc命令降传,
//指示follower端把自己數(shù)據(jù)庫中多存儲(chǔ)的事物給刪除了篷朵,只保留到事物id為maxCommittedLog的那些事物
// Newer than committedLog, send trunc and done
LOG.debug(
"Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}",
Long.toHexString(maxCommittedLog),
getSid());
queueOpPacket(Leader.TRUNC, maxCommittedLog);
currentZxid = maxCommittedLog;
needOpPacket = false;
needSnap = false;
} else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
//如果follower的fzxid在maxCommittedLog和minCommittedLog之前,那么說明follower當(dāng)前的事物落后于leader婆排,需要把leader在(fzxid,maxCommittedLog]之間事物發(fā)送給follower
// Follower is within commitLog range
LOG.info("Using committedLog for peer sid: {}", getSid());
Iterator<Proposal> itr = db.getCommittedLog().iterator();
//把follower端缺失的事物保存到帶同步隊(duì)列中
currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
needSnap = false;
} else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
// Use txnlog and committedLog to sync
//如果follower的fzxid小于minCommittedLog声旺,那么說明follower落后于leader比較多,需要把leader在(fzxid,maxCommittedLog]之間的事物發(fā)送給follower段只,
//這些事物有一部分[minCommittedLog腮猖,maxCommittedLog]是leader緩存在內(nèi)存中committedLog,另一部分(fzxid,minCommittedLog)事物在log中赞枕,需要從這兩個(gè)部分中去分別恢復(fù)數(shù)據(jù)
// Calculate sizeLimit that we allow to retrieve txnlog from disk
long sizeLimit = db.calculateTxnLogSizeLimit();
// This method can return empty iterator if the requested zxid
// is older than on-disk txnlog
//根據(jù)fzxid和sizeLimit來獲取事物log文件迭代器
Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
if (txnLogItr.hasNext()) {
LOG.info("Use txnlog and committedLog for peer sid: {}", getSid());
//獲取第一個(gè)需要同步都follower的zxid
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);
if (currentZxid < minCommittedLog) {
//如果currentZxid小于minCommittedLog澈缺,那么直接發(fā)snap給到follower,不需要通過diff的方式了
LOG.info(
"Detected gap between end of txnlog: 0x{} and start of committedLog: 0x{}",
Long.toHexString(currentZxid),
Long.toHexString(minCommittedLog));
currentZxid = peerLastZxid;
// Clear out currently queued requests and revert
// to sending a snapshot.
queuedPackets.clear();
needOpPacket = true;
} else {
LOG.debug("Queueing committedLog 0x{}", Long.toHexString(currentZxid));
Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
//queueCommittedProposals正常會(huì)把log中的比fzxid大的事物id都放入帶同步到follower隊(duì)列中
//但是這里面也可能會(huì)出現(xiàn)一些異常的情況炕婶,就是fzxid在log事物中姐赡,那么這個(gè)時(shí)候需要向follower發(fā)送一個(gè)trunc命令,當(dāng)然fzxid出現(xiàn)在log事物中那么向follower發(fā)送diff命令柠掂。
//還有一種情況是log的事物id大于fzxid單是發(fā)現(xiàn)他們不是同一個(gè)epoch项滑,這個(gè)時(shí)候需要對(duì)follower做snap 全量數(shù)據(jù)的同步
currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
needSnap = false;
}
}
// closing the resources
if (txnLogItr instanceof TxnLogProposalIterator) {
TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
txnProposalItr.close();
}
} else {
LOG.warn(
"Unhandled scenario for peer sid: {} maxCommittedLog=0x{}"
+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ " peerLastZxid=0x{} txnLogSyncEnabled={}",
getSid(),
Long.toHexString(maxCommittedLog),
Long.toHexString(minCommittedLog),
Long.toHexString(lastProcessedZxid),
Long.toHexString(peerLastZxid),
txnLogSyncEnabled);
}
if (needSnap) {
currentZxid = db.getDataTreeLastProcessedZxid();
}
LOG.debug("Start forwarding 0x{} for peer sid: {}", Long.toHexString(currentZxid), getSid());
leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
} finally {
rl.unlock();
}
if (needOpPacket && !needSnap) {
//直接給follower發(fā)送snap來做全量數(shù)據(jù)恢復(fù)
// This should never happen, but we should fall back to sending
// snapshot just in case.
LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot", getSid());
needSnap = true;
}
//返回是不是需要做snap數(shù)據(jù)同步
return needSnap;
}
我們?cè)诨氐絃eaderHandler的主線程,看看在確定了是不是需要按照snap的方式恢復(fù)follower數(shù)據(jù)后發(fā)生了什么
//syncFollower上面我們已經(jīng)解析了涯贞,會(huì)返回是不是需要按照snap的方式去恢復(fù)follower的數(shù)據(jù)
//如果needSnap是false枪狂,那么leader可能是按照diff的方式把待恢復(fù)的事物放在了queuedPackets中危喉,
boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
// syncs between followers and the leader are exempt from throttling because it
// is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
// are counted as concurrent syncs though
//對(duì)于observer同步數(shù)據(jù)時(shí)候 需要限流
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
//如果needSnap為true
// syncThrottler 是同步限流器
syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
try {
//獲得leader最新已經(jīng)處理的zxid
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
//向follower發(fā)送snap同步數(shù)據(jù)指令
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush();
LOG.info(
"Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshot sync, "
+ "snapshot sync was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
syncThrottler.getSyncInProgress(),
exemptFromThrottle ? "exempt" : "not exempt");
// Dump data to peer
//leader把本地?cái)?shù)據(jù)庫發(fā)送給follower
learnerMaster.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
//數(shù)據(jù)沖刷到網(wǎng)絡(luò)上
bufferedOutput.flush();
} finally {
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
} else {
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
ServerMetrics.getMetrics().DIFF_COUNT.add(1);
}
LOG.debug("Sending NEWLEADER message to {}", sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
//下面是Leader向follower發(fā)送 我是Leader的通知,并且等待follower的ack
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
// Start thread that blast packets in the queue to learner
//把待發(fā)送到follower的暫存在queuedPackets的數(shù)據(jù)使用單獨(dú)的線程發(fā)送出去
startSendingPackets();
//讀取follower端的發(fā)送回來的ack
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
return;
}
LOG.debug("Received NEWLEADER-ACK message from {}", sid);
//等待有過半數(shù)的participant確認(rèn)了Leader的NEWLEADER請(qǐng)求州疾,
//這個(gè)時(shí)候leader的quorumPeer也在等待follower對(duì)自己NEWLEADER的確認(rèn)
learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
// sync ends when NEWLEADER-ACK is received
syncThrottler.endSync();
if (needSnap) {
ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
} else {
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
}
syncThrottler = null;
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(learnerMaster.syncTimeout());
/*
* Wait until learnerMaster starts up
*/
//當(dāng)有過半數(shù)的participant確認(rèn)了Leader的NEWLEADER地位姥饰,那么LearnerHandler就開始等待Leader事物處理引擎的啟動(dòng)
learnerMaster.waitForStartup();
//當(dāng)leader的server啟動(dòng)完成后,下面的代碼就是開始處理具體的業(yè)務(wù)請(qǐng)求了孝治,我們?cè)诤竺鏁?huì)解析
Follower.syncWithLeader
下面我在說回Follower列粪,看看follower的syncWithLeader后面發(fā)生了什么,這個(gè)方法也是巨長(zhǎng),我們先分析數(shù)據(jù)同步的這一塊
protected void syncWithLeader(long newLeaderZxid) throws Exception {
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
//讀取來自Leader發(fā)送過來的數(shù)據(jù)同步指令
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
//得到leader 發(fā)送的diff數(shù)據(jù)同步指令
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
self.setSyncMode(QuorumPeer.SyncMode.DIFF);
snapshotNeeded = false;
} else if (qp.getType() == Leader.SNAP) {
//得到leader 發(fā)送的snap數(shù)據(jù)同步指令
self.setSyncMode(QuorumPeer.SyncMode.SNAP);
LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
// db is clear as part of deserializeSnapshot()
//follower本地zk數(shù)據(jù)庫直接從leader發(fā)送來的snap數(shù)據(jù)流反序列結(jié)果
zk.getZKDatabase().deserializeSnapshot(leaderIs);
// ZOOKEEPER-2819: overwrite config node content extracted
// from leader snapshot with local config, to avoid potential
// inconsistency of config node content during rolling restart.
if (!self.isReconfigEnabled()) {
LOG.debug("Reset config node content from local config after deserialization of snapshot.");
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
}
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got {}", signature);
throw new IOException("Missing signature");
}
//設(shè)置本地庫最新處理的事物id
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
// immediately persist the latest snapshot when there is txn log gap
syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//接受leader發(fā)送來的trunc指令谈飒,進(jìn)行無效事物的刪除
//we need to truncate the log to the lastzxid of the leader
self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
//刪除無效的事物id
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
} else {
LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
//初始化本地zk庫的/zookeeper/config節(jié)點(diǎn)的值
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
//創(chuàng)建LearnerSessionTracker岂座,關(guān)于 SessionTracker的作用我在之前的源碼中有專門解析
zk.createSessionTracker();
long lastQueued = 0;
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
TxnLogEntry logEntry;
// we are now going to start getting transactions to apply followed by an UPTODATE
//這個(gè)循環(huán)就是處理接受來自leader用來恢復(fù)follower數(shù)據(jù)的proposal,LEADERINFO和UPTODATE
outerLoop:
while (self.isRunning()) {
//等待leader發(fā)送來的消息杭措,接受到的第一個(gè)消息應(yīng)該為UPTODATE
readPacket(qp);
switch (qp.getType()) {
case Leader.PROPOSAL:
PacketInFlight pif = new PacketInFlight();
logEntry = SerializeUtils.deserializeTxn(qp.getData());
pif.hdr = logEntry.getHeader();
pif.rec = logEntry.getTxn();
pif.digest = logEntry.getDigest();
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn(
"Got zxid 0x{} expected 0x{}",
Long.toHexString(pif.hdr.getZxid()),
Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
if (pif.hdr.getType() == OpCode.reconfig) {
SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
packetsNotCommitted.add(pif);
break;
//正常來說每一個(gè)proposal后面都會(huì)有一個(gè)commit指令
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
boolean majorChange = self.processReconfig(
qv,
ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
//把proposal應(yīng)用到本地?cái)?shù)據(jù)庫
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
PacketInFlight packet = new PacketInFlight();
if (qp.getType() == Leader.INFORMANDACTIVATE) {
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
logEntry = SerializeUtils.deserializeTxn(remainingdata);
packet.hdr = logEntry.getHeader();
packet.rec = logEntry.getTxn();
packet.digest = logEntry.getDigest();
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData()));
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
} else {
logEntry = SerializeUtils.deserializeTxn(qp.getData());
packet.rec = logEntry.getTxn();
packet.hdr = logEntry.getHeader();
packet.digest = logEntry.getDigest();
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn(
"Got zxid 0x{} expected 0x{}",
Long.toHexString(packet.hdr.getZxid()),
Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
}
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE:
//接受到來自leader的UPTODATE的信息
LOG.info("Learner received UPTODATE message");
if (newLeaderQV != null) {
boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (isPreZAB1_0) {
zk.takeSnapshot(syncSnapshot);
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
//跳出循環(huán)费什,完成數(shù)據(jù)同步,準(zhǔn)備啟動(dòng)本地的zk服務(wù)
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
//接受到了leader的 NEWLEADER 信號(hào)
LOG.info("Learner received NEWLEADER message");
if (qp.getData() != null && qp.getData().length > 1) {
try {
//從leader發(fā)送來的消息中獲得機(jī)器機(jī)器的信息
QuorumVerifier qv = self.configFromString(new String(qp.getData()));
self.setLastSeenQuorumVerifier(qv, true);
newLeaderQV = qv;
} catch (Exception e) {
e.printStackTrace();
}
}
if (snapshotNeeded) {
zk.takeSnapshot(syncSnapshot);
}
//設(shè)置currentEpoch
self.setCurrentEpoch(newEpoch);
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
//給leader的 NEWLEADER發(fā)送ack消息
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
//發(fā)送對(duì)leader UPTODATE的ack
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
//啟動(dòng)follower的執(zhí)行引擎手素,
zk.startup();
follower執(zhí)行引擎啟動(dòng)
在收到了來自服務(wù)端的UPTODATE的消息后鸳址,follower就會(huì)進(jìn)入啟動(dòng)執(zhí)行引擎的過程:初始化請(qǐng)求處理鏈,啟動(dòng)sessionTracker
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
follower 處理來自leader的請(qǐng)求
followLeader方法處理來自leader請(qǐng)求的代碼片段泉懦,我會(huì)在zk集群請(qǐng)求處理過程中去解析
while (this.isRunning()) {
//讀取leader發(fā)送來的消息
readPacket(qp);
//處理接受的leader消息
processPacket(qp);
}
Leader 執(zhí)行引擎啟動(dòng)
當(dāng)leader收到過半數(shù)的對(duì)自己LEADERINFO的ack之后稿黍,那么就會(huì)啟動(dòng)zk的執(zhí)行引擎包括創(chuàng)建session tracker,初始化請(qǐng)求處理鏈崩哩,下面是leader請(qǐng)求處理鏈的初始化過程巡球,我會(huì)在下一篇解析zk處理請(qǐng)求的時(shí)候詳細(xì)解析他們
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
//處理節(jié)點(diǎn)為container類型的線程類
setupContainerManager();
}
Leader QuorumPeer
leader 執(zhí)行引擎啟動(dòng)完成之后,leader主線程進(jìn)入定期檢查各個(gè)follower是不是處于同步的狀態(tài)任務(wù)
LearnerHandler 請(qǐng)求處理
當(dāng)Leader啟動(dòng)完成之后邓嘹,對(duì)應(yīng)的LearnerHandler進(jìn)入請(qǐng)求處理代碼酣栈,我會(huì)在下一遍文章中去解析這個(gè)過程
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();
packetsReceived.incrementAndGet();
ByteBuffer bb;
long sessionId;
int cxid;
int type;
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
LOG.debug("Received ACK from Observer {}", this.sid);
}
syncLimitCheck.updateAck(qp.getZxid());
learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
learnerMaster.touch(sess, to);
}
break;
case Leader.REVALIDATE:
ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
learnerMaster.revalidateSession(qp, this);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
learnerMaster.submitLearnerRequest(si);
requestsReceived.incrementAndGet();
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch (IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
} catch (SyncThrottleException e) {
LOG.error("too many concurrent sync.", e);
syncThrottler = null;
} catch (Exception e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
throw e;
} finally {
if (syncThrottler != null) {
syncThrottler.endSync();
syncThrottler = null;
}
String remoteAddr = getRemoteAddress();
LOG.warn("******* GOODBYE {} ********", remoteAddr);
messageTracker.dumpToLog(remoteAddr);
shutdown();
}
至此就完成了對(duì)zk Leader,F(xiàn)ollower初始化的分析