zookeeper源碼分析(1)-服務(wù)端啟動流程分析了服務(wù)端集群啟動時會進(jìn)行選舉,下面主要分析下選舉流程和后續(xù)的leader,follower撞鹉,observer服務(wù)器的啟動流程
Leader選舉
首先介紹一些選舉相關(guān)術(shù)語:
-
SID:
服務(wù)器ID,同myid的值一樣 -
ZXID:
事務(wù)ID,用來標(biāo)識當(dāng)前服務(wù)器的事務(wù)變更狀態(tài),值越大說明當(dāng)前服務(wù)器的數(shù)據(jù)越新 -
Vote:
投票的對象哩簿,包含如下屬性:
final private long id; //被推舉的Leader SID值
final private long zxid; //被推舉的Leader 事務(wù) ID值
final private long electionEpoch;//邏輯時鐘,用來判斷多個投票是否在同一輪選舉周期中酝静,每進(jìn)行新一輪的投票后节榜,都會對該值加1
final private long peerEpoch;//被推舉的Leader的epoch
final private ServerState state;//投票所屬服務(wù)器的狀態(tài)
服務(wù)器狀態(tài)ServerState
public enum ServerState {
LOOKING, //尋找Leader狀態(tài),處于該狀態(tài)時别智,服務(wù)器會進(jìn)入選舉流程
FOLLOWING,//跟隨者狀態(tài)全跨,只處理非事務(wù)請求,事務(wù)請求會轉(zhuǎn)交給leader服務(wù)器
LEADING,//領(lǐng)導(dǎo)者狀態(tài)
OBSERVING;//觀察者狀態(tài)亿遂,不參與選舉過程浓若,只處理非事務(wù)請求,事務(wù)請求會轉(zhuǎn)交給leader服務(wù)器
}
-
QuorumCnxManager
每臺服務(wù)器在進(jìn)行FastLeaderElection對象創(chuàng)建時蛇数,都會啟動一個QuorumCnxManager,負(fù)責(zé)各臺服務(wù)器之間的底層Leader選舉過程中的網(wǎng)絡(luò)通信挪钓,這個類中維護(hù)了一系列的隊列,用于保存接收到的/待發(fā)送的消息耳舅,對于發(fā)送隊列碌上,會對每臺其他服務(wù)器分別創(chuàng)建一個發(fā)送隊列倚评,互不干擾。核心變量為:
//消息接收隊列馏予,用于存放從其他服務(wù)器接收到的消息
public final ArrayBlockingQueue<Message> recvQueue;
//消息發(fā)送隊列天梧,按照SID分組,用于保存待發(fā)送的消息霞丧,從而保證了各臺機(jī)器之間的消息發(fā)送互不影響
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
//SendWorker是消息發(fā)送器呢岗,這是按照SID分組的消息發(fā)送器集合
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
//最近發(fā)送過的消息,為每個SID保留最近發(fā)送過的消息
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
QuorumCnxManager會為每個遠(yuǎn)程服務(wù)器創(chuàng)建一個SendWorker線程和RecvWorker線程
- 消息發(fā)送過程:
每個SendWorker不斷的從對應(yīng)的消息發(fā)送隊列中獲取一個消息來發(fā)送蛹尝,并將這個消息放入lastMessageSent中后豫,如果隊列為空,則從lastMessageSent取出最后一個消息重新發(fā)送突那,可解決接方?jīng)]有正確接收或處理消息的問題 - 消息接收過程:
每個RecvWorker不斷的從這個TCP連接中讀取消息挫酿,并將其保存到recvQueue隊列中
下面看一下服務(wù)器之間連接的創(chuàng)建過程:
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
可以發(fā)現(xiàn)在兩兩創(chuàng)建連接時,有個規(guī)則:只允許SID大的服務(wù)器主動和其他服務(wù)器建立連接愕难,否則斷開連接早龟。在receiveConnection方法中,服務(wù)器會接受遠(yuǎn)程SID比自己大的連接猫缭。從而避免了兩臺服務(wù)器之間的重復(fù)連接拄衰。
leader選舉算法實(shí)現(xiàn)流程如下:
選舉主要函數(shù)為:FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
//用于記錄當(dāng)前服務(wù)器在本輪次的選舉中收到的所有外部投票
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
break;
}
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
* Only peer epoch is used to check that the votes come
* from the same ensemble. This is because there is at
* least one corner case in which the ensemble can be
* created with inconsistent zxid and election epoch
* info. However, given that only one ensemble can be
* running at a single point in time and that each
* epoch is used only once, using only the epoch to
* compare the votes is sufficient.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: " + n.state
+ " (n.state), " + n.sid + " (n.sid)");
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
選舉流程為:
1.自增選舉輪次
//使得所有有效選票都在一個輪次中
logicalclock.incrementAndGet();
2.初始化選票
第一次選舉前盐碱,每臺服務(wù)器都會將自己推舉為leader
//updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
//leader為myid
synchronized void updateProposal(long leader, long zxid, long epoch){
proposedLeader = leader;
proposedZxid = zxid;
proposedEpoch = epoch;
}
3.發(fā)送初始化選票
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
//投票類型為notification
ToSend.mType.notification,
//投票leader的myid值
proposedLeader,
//投票leader的zxid值
proposedZxid,
//當(dāng)前選舉輪次
logicalclock.get(),
//當(dāng)前服務(wù)器狀態(tài)
QuorumPeer.ServerState.LOOKING,
//為myid
sid,
//當(dāng)前currentEpoch的值童漩,即currentEpoch文件的值
proposedEpoch,
//參與選舉的服務(wù)器地址
qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
會對所有參與選舉的server端發(fā)送自己的選票
注意:在創(chuàng)建FastLeaderElection選舉算法對象時,會調(diào)用它的start方法待秃,
public void start() {
this.messenger.start();
}
// Starts instances of WorkerSender and WorkerReceiver
void start(){
this.wsThread.start();
this.wrThread.start();
}
啟動兩個線程居触,wsThread和wrThread妖混,實(shí)際上會包裝為WorkerSender和WorkerReceiver,WorkerSender會不斷的從FastLeaderElection.sendqueue 中獲得發(fā)送消息QuorumCnxManager的queueSendMap中轮洋,發(fā)送出去制市。WorkerReceiver會不斷的從QuorumCnxManager的recvQueue中獲得消息添加到FastLeaderElection.recvqueue中
實(shí)現(xiàn)流程圖如下:
如果當(dāng)前處于選舉狀態(tài) ServerState.LOOKING,會不斷的進(jìn)入選舉循環(huán)中
4.接收外部選票Notification n
如果沒有接收到外部投票弊予,且QuorumCnxManager.queueSendMap為空祥楣,則重新發(fā)送自己的投票,否則檢查連接汉柒,沒有連接的話重新和其他服務(wù)器創(chuàng)建連接误褪,如果已經(jīng)建立則重新發(fā)送投票
5.判斷選舉輪次(如果接收到了外部選票)
- 如果外部投票的輪次大于內(nèi)部投票
n.electionEpoch > logicalclock.get()
,則立即更新自己的選舉輪次logicalclock.set(n.electionEpoch);
并清空所有已經(jīng)收到的投票recvset.clear()
,然后使用初始化的投票
來進(jìn)行pk,并把內(nèi)部投票發(fā)送出去 - 外部投票的輪次小于內(nèi)部投票碾褂,服務(wù)器會直接忽略掉該外部投票兽间,返回步驟4
- 外部投票的選舉輪次和內(nèi)部投票一致,開始pk選票
6.選票pkFastLeaderElection#totalOrderPredicate
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
因素考慮優(yōu)先級:
1.選舉輪次 2.ZXID 3.SID,誰越大選誰
7.變更投票正塌,并將變更發(fā)送出去
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
8.選票歸檔
recvset
用于記錄當(dāng)前服務(wù)器在本輪次的leader選舉中收到的所有外部投票嘀略,按照SID來區(qū)分
9.統(tǒng)計投票恤溶,更新服務(wù)器狀態(tài)
如果termPredicate
返回為true,說明recvset接收到當(dāng)前輪次所有其他服務(wù)器的投票,如果不再接收到其他選票帜羊,說明當(dāng)前服務(wù)器的選票就是最終leader的SID,也就是有過半的服務(wù)器認(rèn)可了當(dāng)前的內(nèi)部投票咒程,如果確定已經(jīng)有過半的服務(wù)器認(rèn)可了該內(nèi)部投票,則更新當(dāng)前服務(wù)器的狀態(tài)讼育,確定是自身是leader還是follower帐姻,否則終止投票,否則返回步驟4
至此窥淆,選舉過程已經(jīng)分析完畢了,確定了服務(wù)器的角色之后巍杈,下面來看各個服務(wù)器的啟動流程
先放張Leader服務(wù)器和Follewer服務(wù)器啟動期交互過程圖
Leader服務(wù)器啟動
主要方法:Leader.lead()
void lead() throws IOException, InterruptedException {
········統(tǒng)計選舉時間和注冊JMX代碼省略········
try {
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized(this){
lastProposed = zk.getZxid();
}
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
try {
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ newLeaderProposal.ackSetsToString() + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for(LearnerHandler f : getLearners()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
followerSet.add(f.getSid());
}
}
boolean initTicksShouldBeIncreased = true;
for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
initTicksShouldBeIncreased = false;
break;
}
}
if (initTicksShouldBeIncreased) {
LOG.warn("Enough followers present. "+
"Perhaps the initTicks need to be increased.");
}
return;
}
startZkServer();
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.setZooKeeperServer(zk);
}
self.adminServer.setZooKeeperServer(zk);
boolean tickSkip = true;
// If not null then shutdown this leader
String shutdownMessage = null;
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
// We use an instance of SyncedLearnerTracker to
// track synced learners to make sure we still have a
// quorum of current (and potentially next pending) view.
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self
.getLastSeenQuorumVerifier());
}
syncedAckSet.addAck(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
// check leader running status
if (!this.isRunning()) {
// set shutdown flag
shutdownMessage = "Unexpected internal error";
break;
}
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+ syncedAckSet.ackSetsToString() + " ]";
break;
}
tickSkip = !tickSkip;
}
for (LearnerHandler f : getLearners()) {
f.ping();
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
// leader goes in looking state
}
} finally {
zk.unregisterJMX(this);
}
}
1.重新加載快照和事務(wù)日志數(shù)據(jù)忧饭,可參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲
- 啟動Follewer接收器LearnerCnxAcceptor
LearnerCnxAcceptor負(fù)責(zé)接收所有非Leader服務(wù)器的連接請求,用于集群間非選舉通信
LearnerCnxAcceptor.run()
public void run() {
while (!stop) {
Socket s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
}
·······省略異常處理代碼·······
}
可以看到當(dāng)接收到其余服務(wù)器的連接請求時筷畦,會創(chuàng)建LearnerHandler實(shí)例词裤,該實(shí)例負(fù)責(zé)Leader服務(wù)器和其他服務(wù)器之間的消息通信和數(shù)據(jù)同步,初次創(chuàng)建會收到其他服務(wù)器發(fā)送的OBSERVERINFO或是FOLLOWERINFO類型的消息,通信類型可參考zookeeper集群間通信類型
5.Leader解析Learner消息鳖宾,計算新的epoch(getEpochToPropose
)
邏輯為:如果Learner的epoch比Leader的epoch大吼砂,則epoch_of_leader = epoch_of_learner + 1,然后該LearnerHandler會進(jìn)行等待,知道過半的Learner已經(jīng)和Leader建立過通信鼎文,這樣就可以確定Leader 的epoch了
6.Leader向其他服務(wù)器發(fā)送leader狀態(tài)
LearnerHandler.run
public void run() {
try {
leader.addLearnerHandler(this);
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
}
if (leader.self.getView().containsKey(this.sid)) {
LOG.info("Follower sid: " + this.sid + " : info : "
+ leader.self.getView().get(this.sid).toString());
} else {
LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
}
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
leader.waitForEpochAck(this.getSid(), ss);
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
// Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
LOG.debug("Sending NEWLEADER message to " + sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
.toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
LearnerSnapshot snapshot =
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
snapshot.close();
}
}
// Start thread that blast packets in the queue to learner
startSendingPackets();
/*
* Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
* start processing messages.
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK,"
+ " but received packet: {}", packetToString(qp));
return;
}
if(LOG.isDebugEnabled()){
LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
LOG.debug("Sending UPTODATE message to " + sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while(true){
········之后表示主從已經(jīng)同步完成渔肩,zkServer啟動完畢,可以接收服務(wù)器間的通信了················
}
啟動LearnerHandler之后拇惋,會向Learner發(fā)送LEADERINFO,此時leader線程和LearnerHandler線程都會等待在leader.waitForEpochAck(this.getSid(), ss);
方法上
7.Leaner響應(yīng)ACKEPOCH消息
當(dāng)一半Leaner參與選舉的服務(wù)器回復(fù)ACKEPOCH消息之后周偎,Leader服務(wù)器發(fā)送開始進(jìn)行主從數(shù)據(jù)同步,boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
可參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲
8.zkServer啟動
開始進(jìn)行主從同步后,Leader線程會等待在waitForNewLeaderAck(self.getId(), zk.getZxid());
方法上撑帖,每當(dāng)一個LearnerHandler線程完成了和Learner服務(wù)器的同步蓉坎,會發(fā)送一個NEWLWADER給Learner服務(wù)器,Learner服務(wù)器會響應(yīng)一個ACK消息給LearnerHandler胡嘿,一半Leaner參與選舉的服務(wù)器回復(fù)ACK消息之后,leader服務(wù)器會啟動LeaderZooKeeperServer,同時LearnerHandler會發(fā)送一個UPTODATE消息給同步好的Leaner服務(wù)器蛉艾,表示同步完成,可對外提供服務(wù)了
注意:leader服務(wù)器維護(hù)了兩個服務(wù)器校驗器
//last committed quorum verifier
public QuorumVerifier quorumVerifier;
//last proposed quorum verifier
public QuorumVerifier lastSeenQuorumVerifier = null;
在和Leaner服務(wù)器進(jìn)行同步前交互時衷敌,傳遞的一直是lastSeenQuorumVerifier勿侯,我的理解是這樣不影響事務(wù)請求正常提交的quorumVerifier.version,代碼解釋為:如有不對,請小伙伴指教~
Follewer服務(wù)器啟動
主要流程為Follower.followLeader()
void followLeader() throws InterruptedException {
·········省略JMX注冊和異常檢查代碼·········
QuorumServer leaderServer = findLeader();
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
}
}
1.主動連接注冊到Leader服務(wù)器缴罗,并發(fā)送FOLLOWERINFO消息
2.一旦tcp連接上了罐监,會接收到leader服務(wù)器發(fā)送的LEADERINFO消息,并回復(fù)ACKEPOCH消息,調(diào)用Learner.registerWithLeader(Leader.FOLLOWERINFO);
/**
* Once connected to the leader, perform the handshake protocol to
* establish a following / observing connection.
* @param pktType
* @return the zxid the Leader sends for synchronization purposes.
* @throws IOException
*/
protected long registerWithLeader(int pktType) throws IOException{
/*
* Send follower info, including last zxid and sid
*/
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
/*
* Add sid to payload
*/
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);
readPacket(qp);
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
if (newEpoch > self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
// since we have already acked an epoch equal to the leaders, we cannot ack
// again, but we still need to send our lastZxid to the leader so that we can
// sync with it if it does assume leadership of the epoch.
// the -1 indicates that this reply should not count as an ack for the new epoch
wrappedEpochBytes.putInt(-1);
} else {
throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
return qp.getZxid();
}
}
3.開始數(shù)據(jù)同步syncWithLeader(newEpochZxid);
,參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲
4.數(shù)據(jù)同步完成瞒爬,啟動LearnerZooKeeperServer,初始化請求鏈
Observer服務(wù)器啟動
主要流程為:Observer.observeLeader()
void observeLeader() throws Exception {
·········省略JMX注冊和異常檢查代碼·········
try {
QuorumServer leaderServer = findLeader();
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
syncWithLeader(newLeaderZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
}
1.主動連接注冊到Leader服務(wù)器弓柱,并發(fā)送OBSERVERINFO消息
2.一旦tcp連接上了沟堡,會接收到leader服務(wù)器發(fā)送的LEADERINFO消息,并回復(fù)ACKEPOCH消息,主要用來告訴服務(wù)器自己當(dāng)前的lastLoggedZxid和epochBytes矢空,調(diào)用Learner.registerWithLeader(Leader.OBSERVERINFO);
3.開始數(shù)據(jù)同步syncWithLeader(newEpochZxid);
,參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲
4.數(shù)據(jù)同步完成航罗,啟動LearnerZooKeeperServer,初始化請求鏈
此后當(dāng)Leader節(jié)點(diǎn)斷掉或Leader服務(wù)器失去了與過半Follower的聯(lián)系時,底層節(jié)點(diǎn)之間的通信會拋出異常屁药,此時Leader.lead() or Follower.followLeader()
會結(jié)束方法內(nèi)的循環(huán)粥血,從而返回至Quorum.run
方法內(nèi),節(jié)點(diǎn)分別關(guān)閉各自的所有通信酿箭,將選舉狀態(tài)置為LOOKING
狀態(tài)复亏,開始新一輪的選舉。
感謝您的閱讀缭嫡,我是Monica23334 || Monica2333 缔御。立下每周寫一篇原創(chuàng)文章flag的小姐姐,關(guān)注我并期待打臉吧~