1 服務(wù)器端整體概覽圖
概覽圖
- ServerCnxnFactory:負(fù)責(zé)與client之間的網(wǎng)絡(luò)交互,支持NIO(默認(rèn))以及Netty
- SessionTrackerImpl:會(huì)話管理器
- DatadirCleanupManager:定期清理存在磁盤(pán)上的log文件和snapshot文件
- PreRequestProcessor争剿,SyncRequestProcessor郁季,F(xiàn)inalRequestProcessor:請(qǐng)求處理流程旬蟋,責(zé)任鏈模式
- LearnerHandler:Leader與Learner之間的交互
- FileTxnSnapLog:存儲(chǔ)在磁盤(pán)上的日志文件
- DataTree:體現(xiàn)在內(nèi)存中的存儲(chǔ)結(jié)構(gòu)
- Sessions:Session的相關(guān)信息存儲(chǔ)
2 單機(jī)版服務(wù)器啟動(dòng)流程
- 執(zhí)行QuorumPeerMain的main方法陋守,其中先創(chuàng)建一個(gè)
QuorumPeerMain
對(duì)象 - 調(diào)用
initializeAndRun
方法
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
2.1
- args實(shí)際上就是zoo.cfg中的配置炫刷,如下
2.2
- 創(chuàng)建
DatadirCleanupManager
實(shí)例橘原,參數(shù)有snapDir籍铁,dataLogDir,snapRetainCount(要保存snapshot文件的個(gè)數(shù))靠柑,purgeInterval(定期清理的頻率寨辩,單位為小時(shí)),snapRetainCount
與purgeInterval
在zoo.cfg中均可以配置 - 調(diào)用
DatadirCleanupManager
的start
方法歼冰,里面主要依賴PurgeTask
靡狞,這也是一個(gè)線程,其run方法
PurgeTxnLog
的purge
方法:
public static void purge(File dataDir, File snapDir, int num) throws IOException {
// snapshot文件保存的數(shù)量小于3,拋異常
if (num < 3) {
throw new IllegalArgumentException(COUNT_ERR_MSG);
}
// 根據(jù)dataDir和snapDir創(chuàng)建FileTxnSnapLog
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
// 根據(jù)給定數(shù)量獲取最近的文件
List<File> snaps = txnLog.findNRecentSnapshots(num);
// 獲取數(shù)目
int numSnaps = snaps.size();
if (numSnaps > 0) {
// 第二個(gè)參數(shù)是最近的snapshot文件
purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
}
}
找最近n個(gè)snapshot文件:
public List<File> findNRecentSnapshots(int n) throws IOException {
List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
int count = 0;
List<File> list = new ArrayList<File>();
for (File f: files) {
if (count == n)
break;
if (Util.getZxidFromName(f.getName(), SNAPSHOT_FILE_PREFIX) != -1) {
count++;
list.add(f);
}
}
return list;
}
purgeOlderSnapshots
:
static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {
// 從snapshot文件名中獲取zxid
final long leastZxidToBeRetain = Util.getZxidFromName(
snapShot.getName(), PREFIX_SNAPSHOT);
/**
* 我們刪除名稱(chēng)中帶有zxid且小于leastZxidToBeRetain的所有文件隔嫡。
* 該規(guī)則適用于快照文件和日志文件甸怕,
* zxid小于X的日志文件可能包含zxid大于X的事務(wù)。
* 更準(zhǔn)確的說(shuō),命名為log.(X-a)的log文件可能包含比snapshot.X文件更新的事務(wù),
* 如果在該間隔中沒(méi)有其他以zxid開(kāi)頭即(X-a腮恩,X]的日志文件
*/
final Set<File> retainedTxnLogs = new HashSet<File>();
//獲取快照日志梢杭,其中可能包含比給定zxid更新的事務(wù),這些日志需要保留下來(lái)
retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));
/**
* Finds all candidates for deletion, which are files with a zxid in their name that is less
* than leastZxidToBeRetain. There's an exception to this rule, as noted above.
*/
class MyFileFilter implements FileFilter{
private final String prefix;
MyFileFilter(String prefix){
this.prefix=prefix;
}
public boolean accept(File f){
if(!f.getName().startsWith(prefix + "."))
return false;
if (retainedTxnLogs.contains(f)) {
return false;
}
long fZxid = Util.getZxidFromName(f.getName(), prefix);
if (fZxid >= leastZxidToBeRetain) {
return false;
}
return true;
}
}
// add all non-excluded log files
List<File> files = new ArrayList<File>();
File[] fileArray = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));
if (fileArray != null) {
files.addAll(Arrays.asList(fileArray));
}
// add all non-excluded snapshot files to the deletion list
fileArray = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));
if (fileArray != null) {
files.addAll(Arrays.asList(fileArray));
}
// remove the old files
for(File f: files)
{
final String msg = "Removing file: "+
DateFormat.getDateTimeInstance().format(f.lastModified())+
"\t"+f.getPath();
LOG.info(msg);
System.out.println(msg);
if(!f.delete()){
System.err.println("Failed to remove "+f.getPath());
}
}
}
先獲取到那些需要保留的文件,之后再去刪除這些不在保留文件之內(nèi)的文件秸滴。
2.3
判斷集群是單機(jī)啟動(dòng)還是集群?jiǎn)?dòng)武契,集群走runFromConfig(config)
,單機(jī)走ZooKeeperServerMain.main(args)
(其實(shí)單機(jī)版最終走的是ZooKeeperServerMain
的runFromConfig
,)
runFromConfig
方法:
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
final ZooKeeperServer zkServer = new ZooKeeperServer();
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
// 構(gòu)建FileTxnSnapLog對(duì)象
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
// 構(gòu)建與client之間的網(wǎng)絡(luò)通信服務(wù)組件
// 這里可以通過(guò)zookeeper.serverCnxnFactory配置NIO還是Netty
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
初始化網(wǎng)絡(luò)服務(wù)組件后咒唆,cnxnFactory.startup(zkServer);
這里以默認(rèn)網(wǎng)絡(luò)服務(wù)組件為例NIOServerCnxnFactory
@Override
public void start() {
// ensure thread is started once and only once
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
//調(diào)用上面的start方法,實(shí)際調(diào)用thread的start
//也就調(diào)用了該類(lèi)的run方法,啟動(dòng)網(wǎng)絡(luò)服務(wù)
start();
//這是ZooKeeperServer
setZooKeeperServer(zks);
zks.startdata();
zks.startup();
}
2.4
zks.startdata()
方法:
public void startdata()
throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
if (!zkDb.isInitialized()) {
//loadData進(jìn)行初始化
loadData();
}
}
ZKDatabase在內(nèi)存中維護(hù)了zookeeper的sessions届垫, datatree和commit logs集合。 當(dāng)zookeeper server啟動(dòng)的時(shí)候會(huì)將txnlogs和snapshots從磁盤(pán)讀取到內(nèi)存中
ZKDatabase的loadData()
方法:
//如果zkDb已經(jīng)初始化,設(shè)置zxid(內(nèi)存當(dāng)中DataTree最新的zxid)
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
// 沒(méi)有初始化,就loadDataBase
setZxid(zkDb.loadDataBase());
}
public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
loadDataBase()
內(nèi)部調(diào)用的是FileTxnSnapLog
的restore
方法
2.5
zks.startup()
方法:
public synchronized void startup() {
if (sessionTracker == null) {
// 創(chuàng)建會(huì)話管理器
createSessionTracker();
}
// 啟動(dòng)會(huì)話管理器
startSessionTracker();
// 設(shè)置請(qǐng)求處理器
setupRequestProcessors();
// 注冊(cè)jmx
registerJMX();
setState(State.RUNNING);
notifyAll();
}