從服務端啟動腳本可以看到啟動類為org.apache.zookeeper.server.quorum.QuorumPeerMain
,在這個類注釋了關于啟動程序的配置文件
第一個參數(shù)為配置文件冗茸,用于獲取配置信息联喘。該文件是一個屬性文件索昂,因此鍵和值由equals(=)分隔,鍵/值對由換行分隔。下面是配置文件中使用的鍵的一般摘要契耿。
- dataDir - The directory where the ZooKeeper data is stored.
- dataLogDir - The directory where the ZooKeeper transaction log is stored.
- clientPort - The port used to communicate with clients.
- tickTime - The duration of a tick in milliseconds. This is the basic unit of time in ZooKeeper.
- initLimit - The maximum number of ticks that a follower will wait to initially synchronize with a leader.
- syncLimit - The maximum number of ticks that a follower will wait for a message (including heartbeats) from the leader.
- server.id - This is the host:port[:port] that the server with the given id will use for the quorum protocol.
除了配置文件。數(shù)據(jù)目錄中有一個名為“myid”的文件螃征,其中包含服務器id作為ASCII十進制值搪桂。
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
initializeAndRun方法中初始化并啟動
protected void initializeAndRun(String[] args) {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
//解析配置文件zoo.cfg
config.parse(args[0]);
}
// 創(chuàng)建并啟動歷史文件清理器DatadirCleanupManager
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
// 判斷是分布式還是單機
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("。。");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
- 解析配置文件
zoo.cfg
- 創(chuàng)建并啟動歷史文件清理器DatadirCleanupManager
- 判斷當前是集群模式還是單機模式的啟動
先分析單機模式踢械,調用ZooKeeperServerMain.main(args);
酗电,這個類啟動并運行一個獨立的ZooKeeperServer。
-
注冊MBean
ManagedUtil.registerLog4jMBeans();
-
解析ServerConfig
ServerConfig config = new ServerConfig(); config.parse(args[0]);
-
啟動
runFromConfig(config);
進入runFromConfig方法后
-
創(chuàng)建ZooKeeperServer實例
final ZooKeeperServer zkServer = new ZooKeeperServer();
實例化FileTxnSnapLog内列,txnlog和snapshot的助手類
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
-
設置zkServer屬性
zkServer.setTxnLogFactory(txnLog); zkServer.setTickTime(config.tickTime); zkServer.setMinSessionTimeout(config.minSessionTimeout); zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
-
創(chuàng)建cnxnFactory撵术,配置并啟動,默認為NIOServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); cnxnFactory.startup(zkServer);
configure配置連接工廠
public void configure(InetSocketAddress addr, int maxcc) throws IOException { configureSaslLogin(); thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr); thread.setDaemon(true); maxClientCnxns = maxcc; this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); ss.register(selector, SelectionKey.OP_ACCEPT); }
- 創(chuàng)建一個后臺線程话瞧,傳參為this嫩与,也就是說thread線程將執(zhí)行cnxnFactory的方法
- 打開ServerSocketChannel,監(jiān)聽端口
-
主線程阻塞并等待信號交排,優(yōu)雅關機
shutdownLatch.await(); shutdown(); //主線程等待IO線程執(zhí)行完成 cnxnFactory.join(); if (zkServer.canShutdown()) { zkServer.shutdown(true); }
第8步并不是主流程划滋,而是等待shutdownLatch的信號,執(zhí)行關機操作
-
進入NIOServerCnxnFactory的startup方法
start(); setZooKeeperServer(zks); zks.startdata(); zks.startup();
start方法啟動接收連接的線程
thread.start();
在第7步說了埃篓,該thread將執(zhí)行cnxnFactory的run方法处坪,run方法內部就是接收連接、處理新連接架专,也是常規(guī)的NIO操作
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
// 1 這個是對同一個ip限制最大連接樹同窘,配置文件可配
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
// 2 為每個客戶端channel創(chuàng)建一個NIOServerCnxn,并把cnxn放到SelectionKey的attachment上
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 3 讀寫部脚,取出attachment想邦,處理IO
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select " + k.readyOps());
}
}
}
selected.clear();
- 新連接對同一個ip限制最大連接樹,配置文件可配
- 新連接為每個客戶端channel創(chuàng)建一個NIOServerCnxn睛低,并把cnxn放到SelectionKey的attachment上
- 讀寫操作案狠,取出上面的attachment,也就是NIOServerCnxn钱雷,處理IO
關于請求處理流程這里不分析
-
回到第9步骂铁,
zks.startdata();
恢復數(shù)據(jù),重點關注一下
內部調用loadData
方法罩抗,恢復session和節(jié)點數(shù)據(jù)public void loadData() { // 設置Zxid拉庵,這個在集群選舉的時候很重要 if(zkDb.isInitialized()){ setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { // 未初始化,加載數(shù)據(jù)并設置Zxid setZxid(zkDb.loadDataBase()); } // 清理過期session LinkedList<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } zkDb.setDataTreeInit(true); for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } }
ZKDatabase#loadDataBase
方法內部通過調用FileTxnSnapLog#restore
恢復數(shù)據(jù)套蒂,這里面有一些zk關于數(shù)據(jù)存儲和恢復的機制需要關注下钞支。/** * 這個函數(shù)在讀取快照和事務日志之后恢復服務器數(shù)據(jù)庫 */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 1 snapLog.deserialize(dt, sessions); // 2 return fastForwardFromEdits(dt, sessions, listener); }
- 從最后一個有效快照反序列化數(shù)據(jù)樹,并返回最后一個反序列化的zxid
- 對服務器數(shù)據(jù)庫進行快進處理操刀,使其包含最新的事務烁挟。這與恢復相同,但只從事務日志中讀取骨坑,而不從快照中恢復撼嗓。
簡單來說就是先通過snapshot恢復數(shù)據(jù)柬采,然后從log中補充缺失的數(shù)據(jù)。
進到
FileSnap#deserialize
方法public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // 查找最新的且警、有效的100個snapshot文件 List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0; i < snapList.size(); i++) { snap = snapList.get(i); InputStream snapIS = null; CheckedInputStream crcIn = null; try { LOG.info("Reading snapshot " + snap); snapIS = new BufferedInputStream(new FileInputStream(snap)); crcIn = new CheckedInputStream(snapIS, new Adler32()); InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt,sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } // 已恢復有效的snapshot文件 foundValid = true; break; } catch(IOException e) { LOG.warn("problem reading snap file " + snap, e); } finally { if (snapIS != null) snapIS.close(); if (crcIn != null) crcIn.close(); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } //Zxid dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; }
- snapshot文件的查找通過配置中的dataDir目錄中以snapshot開頭的文件
- 100個粉捻?在后續(xù)的恢復代碼中可以看到,snapshot文件可能出錯斑芜,需要從多個文件中重試肩刃,zookeeper可以通過配置定期刪除不需要的、舊的snapshot文件
- snapshot文件命名即為
snapshot.zxid
的形式杏头,根據(jù)snapshot的文件的名稱即可得到最新的zxid
接下來從log文件恢復snapshot文件缺失的數(shù)據(jù)盈包,
FileTxnSnapLog#fastForwardFromEdits
方法public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { // 1. log文件 FileTxnLog txnLog = new FileTxnLog(dataDir); // 2 iterator TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // 迭代到的TxnHeader hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("..."); } else { highestZxid = hdr.getZxid(); } try { //3 processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("..."); } //4 listener.onTxnLoaded(hdr, itr.getTxn()); //5 if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; }
-
這個dataDir并不是配置文件中配置的dataDir,而是dataLogDir大州,在
ZooKeeperServerMain#runFromConfig
方法中续语,創(chuàng)建txnLog時txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir)); // FileTxnSnapLog構造函數(shù) public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); txnLog = new FileTxnLog(this.dataDir); snapLog = new FileSnap(this.snapDir); }
可以看到配置中
dataDir
是snapLog文件的路徑,而dataLogDir
是txnLog的路徑厦画,但dataLogDir
不是必須配置的,在QuorumPeerConfig#parseProperties
方法滥朱,解析config時if (dataDir == null) { throw new IllegalArgumentException("dataDir is not set"); } if (dataLogDir == null) { dataLogDir = dataDir; }
可以看到
dataDir
是必須配置的根暑,而dataLogDir
不配置時會取dataDir
。 -
創(chuàng)建并初始化TxnIterator徙邻,初始化會讀取事務日志中比snapshot的zxid大的日志
// FileTxnIterator#init while (hdr.getZxid() < zxid) { if (!next()) return; }
處理本地事務排嫌,保存到dataTree中
-
將提議加入到廣播隊列中,等待選舉稱為Leader后廣播給其他Followers缰犁、Leaners淳地,暫不考慮,需要注意的是傳入的listener是調用
ZKDatabase#loadDataBase
方法傳入的commitProposalPlaybackListener
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() { public void onTxnLoaded(TxnHeader hdr, Record txn){ addCommittedProposal(hdr, txn); } };
迭代下一個事務操作
通過讀取snapshot文件+log文件帅容,將內存數(shù)據(jù)庫恢復
-
回到第9步颇象,
zks.startup();
public synchronized void startup() { //1 會話管理器 if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); // 請求處理鏈 setupRequestProcessors(); // jmx registerJMX(); setState(State.RUNNING); notifyAll(); }
startup
方法有很多子流程 -
創(chuàng)建并啟動會話管理器
protected void createSessionTracker() { sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, 1, getZooKeeperServerListener()); } protected void startSessionTracker() { ((SessionTrackerImpl)sessionTracker).start(); }
在
SessionTrackerImpl
的構造函數(shù)中public SessionTrackerImpl(SessionExpirer expirer, ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime, long sid, ZooKeeperServerListener listener) { super("SessionTracker", listener); this.expirer = expirer; this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; nextExpirationTime = roundToInterval(Time.currentElapsedTime()); this.nextSessionId = initializeNextSession(sid); for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); } }
- 傳入的
expirer
是zks實例,也就是會調用ZooKeeperServer#expire
方法過期session -
expirationInterval
是配置文件中配置的tickTime
并徘,作為過期分桶的最小間隔 -
sessionsWithTimeout
sessionId的過期時間 -
nextExpirationTime
下次清理過期session的時間遣钳,從SessionTrackerImpl#roundToInterval
方法可以看出分桶過期策略 -
nextSessionId
分配sessionId時起始值,值為serverId(8) + time(40) + 0(16)
:高8為為serverId(這里傳入的1)麦乞,中40位為時間蕴茴,底16位遞增 - 遍歷
sessionsWithTimeout
調用addSession
方法,這個方法內部創(chuàng)建SessionImpl
實例作為session的實體姐直,同時調用SessionTrackerImpl#touchSession
方法將session分桶倦淀,同樣是調用roundToInterval
方法
啟動:sessionTracker作為線程啟動,后臺定期清理過期數(shù)據(jù)
- 傳入的
-
設置請求處理鏈
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
單機模式的請求鏈
PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
注冊JMX服務
那么至此單機版zk就啟動完成了声畏,接下來便是處理請求撞叽、響應請求、觸發(fā)節(jié)點監(jiān)聽等處理了。