1 集群版服務(wù)器啟動(dòng)流程
2 源碼解讀
2.1
- 執(zhí)行QuorumPeerMain的main方法半开,其中先創(chuàng)建一個(gè)QuorumPeerMain對(duì)象
- 調(diào)用initializeAndRun方法,在該方法里面走的是
runFromConfig(config)
丛忆,條件判斷是args.length == 1 && config.servers.size() > 0
config.servers配置
server.1=node1:12888:13888
server.2=node2:12888:13888
server.3=node3:12888:13888
12888主要用于leader和learner通信端口潘明;
13888主要用于FastLeaderElection的端口
最終調(diào)用的是QuorumPeerMain
的runFromConfig
方法
2.2 QuorumPeerMain中的runFromConfig
進(jìn)行一系列的屬性設(shè)置
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
// 構(gòu)造QuorumPeer對(duì)象
quorumPeer = getQuorumPeer();
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
// 設(shè)置選舉算法
quorumPeer.setElectionType(config.getElectionAlg());
// 需要在data目錄下創(chuàng)建一個(gè)myid文件,標(biāo)識(shí)myid
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.setCnxnFactory(cnxnFactory);
// 集群驗(yàn)證器,主要完成判斷一組server在已給定的配置的server列表中,是否能夠構(gòu)成集群
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
// 調(diào)用start方法
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
2.3 調(diào)用quorumPeer的start方法
@Override
public synchronized void start() {
// 加載內(nèi)存數(shù)據(jù)庫(kù)
loadDataBase();
cnxnFactory.start();
startLeaderElection();
super.start();
}
loadDataBase()方法
:
private void loadDataBase() {
File updating = new File(getTxnFactory().getSnapDir(),
UPDATING_EPOCH_FILENAME);
try {
// 這里與單機(jī)版一樣,調(diào)用snapshot的restore
zkDb.loadDataBase();
// 獲取內(nèi)存中即DataTree中最近的事務(wù)id
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
// 根據(jù)zxid獲取epoch
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
// 有該文件的話(huà),獲取到并且與epochOfZxid比較
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
if (epochOfZxid > currentEpoch && updating.exists()) {
LOG.info("{} found. The server was terminated after " +
"taking a snapshot but before updating current " +
"epoch. Setting current epoch to {}.",
UPDATING_EPOCH_FILENAME, epochOfZxid);
setCurrentEpoch(epochOfZxid);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
}
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME
+ " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
}
} catch(IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
cnxnFactory.start()
與單機(jī)版的一致;
startLeaderElection()
就是正式進(jìn)行Leader選舉
快速選舉方法見(jiàn)另外的文章
最后調(diào)用QuorumPeer的start方法,實(shí)際上就是調(diào)用其run方法匾嘱。