一:前言
當服務通過選舉算法進行選舉完之后,各個服務器就需要設置自己的角色橱野,并啟動相對應的服務(也就是服務的初始化)朽缴,之后就等待客戶端的請求,處理響應的請求水援。
二:流程分析
2.1密强、 LEADER
功能:接收客戶端的請求, 事務請求的提議者蜗元。
首先我們查看啟動代碼:
// makeLeader(logFactory) 新建一個Leader實體或渤,打開Leader服務器的交換信息的接口,等待與Learner通信
setLeader(makeLeader(logFactory));
// Leader服務啟動的主方法
leader.lead();
Leader服務啟動的主方法leader.lead();
流程分析:
void lead() throws IOException, InterruptedException {
...
//1> 加載FileTxnSnapLog中的數(shù)據(jù)奕扣, 并把每一個事務數(shù)據(jù)封裝成一個Proposal,放入committedLog
// 中薪鹦,并計算minCommittedLog, maxCommittedLog惯豆, 數(shù)據(jù)放入ZKDatabase
//2> 處理事務數(shù)據(jù)時距芬,若事務Type為Session的數(shù)據(jù)涝开,響應的增刪到sessionsWithTimeouts中
//3> 設置Leader的highestZxid
//4> 通過ZKDatabase中的sessions與sessionsWithTimeouts進行比較,Kill失效的session框仔, 并將
// 失效相關聯(lián)的臨時節(jié)點進行刪除
zk.loadData();</br>
// 開啟接收Leaner的連接線程舀武, 并把每一個Leaner的連接封裝成一個LearnerHandler實體, 添加到
// Leader的learners列表中离斩,每一個LearnerHandler開啟一個線程處理響應的Leaner的信息
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();</br>
readyToStart = true;
// 獲取當前選舉的輪次, 同步等待法定人數(shù)的Leaner注冊身份(FOLLOWERINFO/OBSERVERINFO)
// 到Leader 超時時間為 initLimit \* tickTime
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// 根據(jù)當前的輪次银舱,初始化zxid
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));</br>
...</br>
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);</br>
...</br>
//Leaner向Leaner發(fā)送LEADERINFO消息,并等待法定人數(shù)的Leaner的響應epoch的消息
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
try {
//等待法定人數(shù)的Leaner初始化同步數(shù)據(jù)響應的消息跛梗,超時時間為 initLimit \* tickTime
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
} catch (InterruptedException e) {
...
}
//開啟對客戶端的服務的主方法
startZkServer();
//是否有zxid的初始化設置
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}
//對每一個Leaner發(fā)起ping檢測消息寻馏, 檢測的時間間隔為tickTime / 2
// 檢測learnerType = LearnerType.PARTICIPANT的人數(shù)是否少于法定的人數(shù),
// 如果少于核偿,則 shutDown服務诚欠,進行新一輪的選舉
boolean tickSkip = true;
while (true) {
Thread.sleep(self.tickTime / 2);
if (!tickSkip) {
self.tick++;
}
HashSet<Long> syncedSet = new HashSet<Long>();
syncedSet.add(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
f.ping();
}
if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
shutdown("Not sufficient followers synced, only synced with sids: [ "
+ getSidSetString(syncedSet) + " ]");
return;
}
tickSkip = !tickSkip;
}
//...
}
下面我們隊上面代碼的兩個流程進行流程分析①: Leaner向Leader的注冊同步流程;②:Leader與客戶端服務的流程漾岳。
2.1.1、 Leaner向Leader的注冊同步流程
- 1.尼荆、接收到Leaner的信息注冊消息 type = Leader.FOLLOWERINFO/OBSERVERINFO + acceptedEpoch + myid, 根據(jù)消息設置LeanerHandler的字段值左腔,
this.sid = li.getServerid();this.version = li.getProtocolVersion();
, 同步等待法定人數(shù)Leander的注冊消息得到newEpoch(如果每次接收到的lastAcceptedEpoch >= epoch
捅儒, 則設置epoch = lastAcceptedEpoch+1;
), 發(fā)送Leader信息(type:Leader.LEADERINFO, zxid: newEpoch + 0)給Leander - 2液样、同步等待Leaner接收到Leander的信息包之后返回的ACKEPOCH消息, 從ACKEPOCH消息獲取到Leaner的lastLoggerZxid(為了防止與Leader的lastLoggerZxid沖突巧还, 用peerLastZxid代替)鞭莽。處理同步數(shù)據(jù)邏輯:
- i: 如果peerLastZxid == LastZxid,則packetToSend置為DIFF, zxidToSend置為peerLastZxid
- ii: 如果
proposals.size() != 0
(proposals為committedLog的列表)- a. 子條件
maxCommittedLog >= peerLastZxid && minCommittedLog <= peerLastZxid
麸祷, 比較proposals中最接近且小于等于peerLastZxid的zxid, 如果小于撮抓, packetToSend置為TRUNC, zxidToSend置為此值摇锋; 否則packetToSend置為DIFF, zxidToSend置為maxCommittedLog丹拯, 然后把所有大于peerLastZxid 的propose 封裝成QuorumPacket (type: commit, zxid: propose.packet.getZxid())放入queuedPackets隊列中, - b.子條件peerLastZxid > maxCommittedLog, 則packetToSend置為TRUNC荸恕, zxidToSend置為maxCommittedLog
- a. 子條件
- iii: peerLastZxid > maxCommittedLog乖酬, packetToSend置為TRUNC, zxidToSend置為 maxCommittedLog
- iv: 否則packetToSend置為SNAP
- 3融求、將toBeApplied(ToBeAppliedRequestProcessor未完成的請求)列表中的數(shù)據(jù)(type: Leader.COMMIT)添加到queuedPackets隊列中咬像, 如果
handler.LearnerType() == LearnerType.PARTICIPANT
(即Leander的角色為Follower), 則將outstandingProposals(ProposalRequestProcessor未完成的請求) 中的提議數(shù)據(jù)(type: Leader.PROPOSAL)添加到queuedPackets隊列中, 將Leaner 根據(jù)leanerType分別放入forwardingFollowers 和 observingLearners集合中 - 4县昂、將新一輪的Leader的信息(type: NEWLEADER肮柜, zxid: newEpoch + 0)放入queuedPackets隊列中
- 5、發(fā)送同步信息(type: packetToSend, zxidToSend: zxidToSend), 如果為SNAP消息倒彰, 則zxidToSend為Leader的lastLoggerZxid审洞, 且將ZKDatabase序列化為數(shù)據(jù)流發(fā)送給Leaner
- 6、開啟線程待讳, 發(fā)送queuedPackets 隊列中包給Leander
- 7芒澜、接收來自Leander同步的Ack消息, 同步等待法定人數(shù)Leaner的人數(shù)的回應
- 8创淡、發(fā)送Leader.UPTODATE消息給客戶端痴晦, 表示可以使用數(shù)據(jù)了
<b>注:</b>可以看到queuedPackets隊列中的數(shù)據(jù)的順序為:Leader.COMMIT —> Leader.PROPOSAL——> Leader.NEWLEADER —> Leader.UPTODATE
Leader異步等待法定的客戶端注冊同步:Leader的Lead()方法中:
// 異步等待Leaner信息的注冊
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// 異步等待Leaner接收到Leader的信息的EpochAck
waitForEpochAck(self.getId(), leaderStateSummary);
// 異步等待Leander對Leader的NEWLEADER消息的Ack
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
2.1.2、 Leader與客戶端服務的流程
zk處理客戶端的請求都是通過Proccessor進行鏈路處理琳彩。對于LeaderZooKeeperServer對應的Proccessor的處理關系為: PrepRequestProcessor ——> ProposalRequestProcessor ——> CommitProcessor ——> Leader.ToBeAppliedRequestProcessor ——>FinalRequestProcessor誊酌, 同時ProposalRequestProcessor 將事務請求(request.hdr != null)交給同步刷盤處理器處理 ProposalRequestProcessor ——> SyncRequestProcessor ——> AckRequestProcessor。AckRequestProcessor處理器將刷盤成功的請求交給Leader作為一個提議露乏,作為Leader判斷提議成功的法定人數(shù)碧浊, 成立交給CommitProcessor
2.2、 FOLLOWER
功能:接收客戶端的請求施无, 事務請求提議的參與者,將事務請求轉發(fā)給Leader必孤。
首先查看啟動代碼
// makeFollower(logFactory)新建一個Follwer實體猾骡,建立與Leader通信
setFollower(makeFollower(logFactory));
// follower啟動的主方法
follower.followLeader();
Follower服務啟動的主方法follower.followLeader();
流程分析:
void followLeader() throws InterruptedException {
...
try {
InetSocketAddress addr = findLeader();
try {
//與Leader建立通信
connectToLeader(addr);
//將自己的信息注冊到Leader
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);</br>
//初始化同步Leader的數(shù)據(jù)
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);</br>
//接收Leader通信數(shù)據(jù),并做響應的業(yè)務處理
QuorumPacket qp = new QuorumPacket();
while (self.isRunning()) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
...
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
2.2.1 Follwer注冊與同步流程分析
- 1:發(fā)送自身信息包(type: Leader.FOLLOWERINFO, zxid: acceptedEpoch + 1)給Leader
- 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1)敷搪, 判斷newEpoch 與 自身acceptedEpoch 的大行讼搿: 大于, 則將 自身acceptedEpoch設置為 newEpoch赡勘, 發(fā)送的epoch設置為舊的acceptedEpoch嫂便; 相等, 發(fā)送epoch設置為-1闸与; 否則異常毙替。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
- 3: 接收來自Leader根據(jù)lastLoggedZxid發(fā)送過來的初始化同步信息: Leader.DIFF, 不做任何操作; Leader.SNAP践樱, 則將包中的數(shù)據(jù)序列化作為自己的ZKDatabase厂画; Leader.TRUNC, 則根據(jù)接收到的zxid來TRUNC當前ZKDatabase的數(shù)據(jù)拷邢, 設置
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- 4: 接收來自Leader的同步數(shù)據(jù): Leader.COMMIT——> Leader.PROPOSAL——> Leader.NEWLEADER ——> Leader.UPTODATE
Leader.PROPOSAL: 提議的數(shù)據(jù)袱院, 添加到packetsNotCommitted 隊列中
Leader.COMMIT: 需要提交的數(shù)據(jù),添加到packetsCommitted隊列
Leader.NEWLEADER: 接收新的Leader領導的事務,保存快照數(shù)據(jù)忽洛, 設置self.setCurrentEpoch(newEpoch);
, 回復AcK消息給Leader
Leader.UPTODATE腻惠, 同步完成,設置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk)
)欲虚, 跳出同步流程 - 5: 對packetsNotCommitted 中的數(shù)據(jù)調用
fzk.logRequest(p.hdr, p.rec)
(刷盤集灌, 返回提議AcK),對packetsCommitted中的數(shù)據(jù)調用fzk.commit(zxid);
(提交操作)
2.2.1 Follwer與客戶端的服務流程
對于角色Follwer的處理客戶端的請求是通過下面的RequestProcessor進行處理: FollowerRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor; 同時將其轉發(fā)給Leader是并把請求苍在,Leader會進行提議請求绝页,將接收到的提議請求交給SyncRequestProcessor ——> SendAckRequestProcessor, SendAckRequestProcessor將ack消息發(fā)送給Leader,作為Leader判斷提議成功的法定人數(shù)寂恬,成立交給CommitProcessor 续誉。
2.3、 OBSERVER
功能:接收客戶端的請求初肉, 將事務請求轉發(fā)給Leader酷鸦。
首先查看啟動代碼
// makeObserver(logFactory)新建一個Observer實體,建立與Leader通信
setObserver(makeObserver(logFactory));
// observer啟動的主方法
observer.observeLeader();
Observer服務啟動的主方法observer.observeLeader();
流程分析:
void observeLeader() throws InterruptedException {
...
try {
//建立與Leader的連接
connectToLeader(addr);
//將自身的信息注冊到Leader中牙咏, 返回Leader的lastLoggerZxid
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
//同步Leader的數(shù)據(jù)信息
syncWithLeader(newLeaderZxid);
//接收Leader通信數(shù)據(jù)臼隔,并做響應的業(yè)務處理
QuorumPacket qp = new QuorumPacket();
while (self.isRunning()) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
...
}
...
}
3.3.1 Observer注冊與同步流程分析
- 1:發(fā)送自身信息包(type: Leader.OBSERVERINFO, zxid: acceptedEpoch + 1)給Leader
- 2: 接收到Leader的信息包(type: Leader.LEADERINFO, zxid: newEpoch + 1), 判斷newEpoch 與 自身acceptedEpoch 的大型: 大于摔握, 則將 自身acceptedEpoch設置為 newEpoch, 發(fā)送的epoch設置為舊的acceptedEpoch丁寄; 相等氨淌, 發(fā)送epoch設置為-1; 否則異常伊磺。 包(type: Leader.ACKEPOCH, zxid: lastLoggedZxid)
- 3: 接收來自Leader根據(jù)lastLoggedZxid發(fā)送過來的初始化同步信息: Leader.DIFF, 不做任何操作盛正; Leader.SNAP, 則將包中的數(shù)據(jù)序列化作為自己的ZKDatabase屑埋; Leader.TRUNC豪筝, 則根據(jù)接收到的zxid來TRUNC當前ZKDatabase的數(shù)據(jù), 設置
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- 4: 接收來自Leader的同步數(shù)據(jù): Leader.COMMIT——> Leader.NEWLEADER ——> Leader.UPTODATE
Leader.COMMIT: 需要提交的數(shù)據(jù)摘能,添加到packetsCommitted隊列
Leader.NEWLEADER: 接收新的Leader領導的事務续崖,保存快照數(shù)據(jù), 設置self.setCurrentEpoch(newEpoch);
, 回復AcK消息給Leader
Leader.UPTODATE团搞, 同步完成袜刷,設置FollowerZooKeeperServer(self.cnxnFactory.setZooKeeperServer(zk)
), 跳出同步流程
3.3.2 Observer與客戶端的服務流程
對于Observer的處理客戶端的請求是通過下面的RequestProcessor進行處理:ObserverRequestProcessor ——> CommitProcessor ——> FinalRequestProcessor ; 同時對于Leader的Leader.INFORM消息會同時交給SyncRequestProcessor(刷盤操作)跟CommitProcessor莺丑。