每探索一門新技術(shù)的時(shí)候审孽,我們都會從方法的入口開始探索打颤,對于zookeeper也一樣瘸洛,zookeeper在啟動(dòng)時(shí)候是通過QuorumPeerMain來作為啟動(dòng)入口類。我們有必要知道在啟動(dòng)類啟動(dòng)時(shí),zookeeper做了哪些初始化和準(zhǔn)備工作。
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
//zookeeper啟動(dòng)入口,初始化參數(shù)配置并啟動(dòng)zookeeper
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.exit(2);
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.exit(3);
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.exit(4);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
我們看到main方法里通過調(diào)用initializeAndRun開始進(jìn)行初始化配置和啟動(dòng)憎亚,所以我們跟進(jìn)去看下該方法的具體行為是做了什么陆爽?
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
// 1贡必、解析zoo.cfg配置文件利花,讀取配置文件中的KEY-value鍵值對挠乳,初始化QuorumPeerConfig
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
//創(chuàng)建文件清理管理器,該清理器主要負(fù)責(zé)定期清理內(nèi)存快照文件和日志文件
//snapRetainCount:至少保留文件個(gè)數(shù)
//purgeInterval:定時(shí)任務(wù)間隔時(shí)間马靠,只有purgeInterval大于0第晰,該文件清理器才會開啟文件清理的定時(shí)任務(wù)
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//quorumVerifier!=null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
//上面表示如果是集群配置的話,服務(wù)器將會集群方式啟動(dòng)
/**zoo.cfg集群配置servers配置如下:
servers.1=127.0.0.1:2287:3387
servers.2=127.0.0.1:2288:3388
servers.3=127.0.0.1:2289:3389
***/
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);//集群方式啟動(dòng)
} else {
ZooKeeperServerMain.main(args);//單機(jī)版啟動(dòng)
}
}
然后我們看下羽历,當(dāng)zookeeper以集群方式啟動(dòng)時(shí)疏尿,具體做了哪些準(zhǔn)備工作敌呈?
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
//注冊日志管理(不用管)
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
//我們可以在zoo.cfg里通過clientPortAddress屬性來配置线脚,如果沒有配置clientPortAddress的話,默認(rèn)采用clientPort端口地址
if (config.getClientPortAddress() != null) {
//創(chuàng)建ServerCnxnFactory實(shí)例,ServerCnxnFactory主要負(fù)責(zé)跟客戶端進(jìn)行網(wǎng)絡(luò)通信抽诉,接收客戶端網(wǎng)絡(luò)請求唉窃,比如提交事務(wù)等矮嫉,默認(rèn)采用NIO昨寞。
// 可以通過配置系統(tǒng)參數(shù)zookeeper.serverCnxnFactory來配置它實(shí)際實(shí)現(xiàn)的方式
cnxnFactory = ServerCnxnFactory.createFactory();
//根據(jù)端口號和maxClientCnxns初始化配置ServerCnxnFactory 羽峰,這兩個(gè)參數(shù)都可以通過zoo.cfg配置,分別是:
//clientPortAddress||ClientPort:客戶端發(fā)送請求的端口
//maxClientCnxns:最大并發(fā)數(shù),用來控制客戶端高并發(fā)
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(),false);
}
//和ServerCnxnFactory 一樣鼻种,通過在ZOO.CFG中配置secureClientPortAddress或secureClientPort篙贸,可都不配置
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
quorumPeer = getQuorumPeer();
//創(chuàng)建內(nèi)存快照文件和事務(wù)文件日志文件的管理器FileTxnSnapLog爵川,該管理器提供了zookeeper上層服務(wù)跟底層數(shù)據(jù)庫存儲的對接入口碟案,
// 他提供了一系列接口扮叨,用來訪問日志文件和內(nèi)存快站文件(具體提供了哪些訪問底層存儲文件的方法,我們后面會單獨(dú)抽出來探究一番)
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//配置選舉算法薪贫,不過現(xiàn)在一般都不配瞧省,默認(rèn)采用3
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());//設(shè)置心跳時(shí)間
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());//設(shè)置最小超時(shí)時(shí)間
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());//設(shè)置最小超時(shí)時(shí)間
quorumPeer.setInitLimit(config.getInitLimit());//初時(shí)心跳次數(shù)
quorumPeer.setSyncLimit(config.getSyncLimit());//同步心跳次數(shù)
quorumPeer.setConfigFileName(config.getConfigFilename());
//創(chuàng)建內(nèi)存數(shù)據(jù)庫ZKDatabase實(shí)例置森,創(chuàng)建該數(shù)據(jù)庫內(nèi)存實(shí)例時(shí)會注入一個(gè)DataTree建瘫,
// DataTree為內(nèi)存數(shù)據(jù)庫內(nèi)真正作為保存內(nèi)存數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu),維護(hù)了整個(gè)內(nèi)存數(shù)據(jù)庫中節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)泪勒,
// 同時(shí)在創(chuàng)建DataTree時(shí)候會初始化創(chuàng)建根路徑/zookeeper和配額管理節(jié)點(diǎn)/zookeeper/quota
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
//初始化QuorumPeer的一些其它屬性,包括quorumCnxnThreadsSize(線程池QuerumServer manager線程池的初始線程數(shù))
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
...
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
//
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
//啟動(dòng)quorumPeer并開始投票選舉
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
我們再來看下它是如何創(chuàng)建ServerCnxnFactory呢?
//創(chuàng)建ServerCnxnFactory實(shí)例由捎,ServerCnxnFactory主要負(fù)責(zé)跟客戶端進(jìn)行網(wǎng)絡(luò)通信邻奠,接收客戶端網(wǎng)絡(luò)請求贰镣,比如提交事務(wù)等,默認(rèn)采用NIO。
// 可以通過配置系統(tǒng)參數(shù)zookeeper.serverCnxnFactory來配置它實(shí)際實(shí)現(xiàn)的方式
static public ServerCnxnFactory createFactory() throws IOException {
//獲取系統(tǒng)參數(shù)zookeeper.serverCnxnFactory配置
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {//沒有配置則默認(rèn)采用NIOServerCnxnFactory
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
//創(chuàng)建NIOServerCnxnFactory
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
我么再看下quorumPeer.start做了哪些工作
public synchronized void start() {
//判斷集群配置是否包含本機(jī)myid
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//恢復(fù)內(nèi)存數(shù)據(jù)庫的數(shù)據(jù)
loadDataBase();
//啟動(dòng)網(wǎng)絡(luò)通信服務(wù)瞄勾,并監(jiān)聽客戶端的事務(wù)請求
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//開始進(jìn)行l(wèi)eader選舉
startLeaderElection();
//啟動(dòng)當(dāng)前線程(QuorumPeer本身也是一個(gè)線程).具體run做了哪些,我們后面會單獨(dú)討論,只需要知道它主要用來統(tǒng)計(jì)票數(shù),更新節(jié)點(diǎn)服務(wù)器的狀態(tài)
super.start();
}
總結(jié)
通過上面的代碼弥激,大概可以知道进陡,zookeeper服務(wù)器在啟動(dòng)時(shí)候主要做了以下工作:
1、解析zoo.cfg配置文件微服,并初始化QuorumPeerConfig.
2趾疚、創(chuàng)建文件清理器,根據(jù)配置參數(shù)purgeInterval決定是否開啟任務(wù)定時(shí)清理內(nèi)存快照文件和日志文件以蕴。
3糙麦、創(chuàng)建并開啟用于和客戶端進(jìn)行網(wǎng)絡(luò)通信的ServerCnxnFactory,默認(rèn)是NIO的方式丛肮。
4赡磅、創(chuàng)建日志快照和內(nèi)存快照的管理器FileTxnSnapLog,用來訪問日志文件和內(nèi)存快站文件宝与。
5焚廊、創(chuàng)建并初始化內(nèi)存數(shù)據(jù)庫ZKDatabase
6、恢復(fù)內(nèi)存數(shù)據(jù)庫數(shù)據(jù)
7习劫、開始執(zhí)行l(wèi)eader選舉咆瘟。