[Zookeeper] 服務(wù)端之單機(jī)版服務(wù)器啟動(dòng)

1 服務(wù)器端整體概覽圖

概覽圖
  • ServerCnxnFactory:負(fù)責(zé)與client之間的網(wǎng)絡(luò)交互,支持NIO(默認(rèn))以及Netty
  • SessionTrackerImpl:會(huì)話管理器
  • DatadirCleanupManager:定期清理存在磁盤(pán)上的log文件和snapshot文件
  • PreRequestProcessor争剿,SyncRequestProcessor郁季,F(xiàn)inalRequestProcessor:請(qǐng)求處理流程旬蟋,責(zé)任鏈模式
  • LearnerHandler:Leader與Learner之間的交互
  • FileTxnSnapLog:存儲(chǔ)在磁盤(pán)上的日志文件
  • DataTree:體現(xiàn)在內(nèi)存中的存儲(chǔ)結(jié)構(gòu)
  • Sessions:Session的相關(guān)信息存儲(chǔ)

2 單機(jī)版服務(wù)器啟動(dòng)流程

  1. 執(zhí)行QuorumPeerMain的main方法陋守,其中先創(chuàng)建一個(gè)QuorumPeerMain對(duì)象
  2. 調(diào)用initializeAndRun方法
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

2.1

  • args實(shí)際上就是zoo.cfg中的配置炫刷,如下

2.2

  • 創(chuàng)建DatadirCleanupManager實(shí)例橘原,參數(shù)有snapDir籍铁,dataLogDir,snapRetainCount(要保存snapshot文件的個(gè)數(shù))靠柑,purgeInterval(定期清理的頻率寨辩,單位為小時(shí)),snapRetainCountpurgeInterval在zoo.cfg中均可以配置
  • 調(diào)用DatadirCleanupManagerstart方法歼冰,里面主要依賴PurgeTask靡狞,這也是一個(gè)線程,其run方法

PurgeTxnLogpurge方法:

    public static void purge(File dataDir, File snapDir, int num) throws IOException {
        // snapshot文件保存的數(shù)量小于3,拋異常
        if (num < 3) {
            throw new IllegalArgumentException(COUNT_ERR_MSG);
        }
        // 根據(jù)dataDir和snapDir創(chuàng)建FileTxnSnapLog
        FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
        // 根據(jù)給定數(shù)量獲取最近的文件
        List<File> snaps = txnLog.findNRecentSnapshots(num);
        // 獲取數(shù)目
        int numSnaps = snaps.size();
        if (numSnaps > 0) {
            // 第二個(gè)參數(shù)是最近的snapshot文件
            purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));
        }
    }

找最近n個(gè)snapshot文件:

    public List<File> findNRecentSnapshots(int n) throws IOException {
        List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
        int count = 0;
        List<File> list = new ArrayList<File>();
        for (File f: files) {
            if (count == n)
                break;
            if (Util.getZxidFromName(f.getName(), SNAPSHOT_FILE_PREFIX) != -1) {
                count++;
                list.add(f);
            }
        }
        return list;
    }

purgeOlderSnapshots:

    static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {
        // 從snapshot文件名中獲取zxid
        final long leastZxidToBeRetain = Util.getZxidFromName(
                snapShot.getName(), PREFIX_SNAPSHOT);

        /**
         * 我們刪除名稱(chēng)中帶有zxid且小于leastZxidToBeRetain的所有文件隔嫡。
         * 該規(guī)則適用于快照文件和日志文件甸怕,
         * zxid小于X的日志文件可能包含zxid大于X的事務(wù)。
         * 更準(zhǔn)確的說(shuō),命名為log.(X-a)的log文件可能包含比snapshot.X文件更新的事務(wù),
         * 如果在該間隔中沒(méi)有其他以zxid開(kāi)頭即(X-a腮恩,X]的日志文件
 
         */
        final Set<File> retainedTxnLogs = new HashSet<File>();
        //獲取快照日志梢杭,其中可能包含比給定zxid更新的事務(wù),這些日志需要保留下來(lái)
        retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain)));

        /**
         * Finds all candidates for deletion, which are files with a zxid in their name that is less
         * than leastZxidToBeRetain.  There's an exception to this rule, as noted above.
         */
        class MyFileFilter implements FileFilter{
            private final String prefix;
            MyFileFilter(String prefix){
                this.prefix=prefix;
            }
            public boolean accept(File f){
                if(!f.getName().startsWith(prefix + "."))
                    return false;
                if (retainedTxnLogs.contains(f)) {
                    return false;
                }
                long fZxid = Util.getZxidFromName(f.getName(), prefix);
                if (fZxid >= leastZxidToBeRetain) {
                    return false;
                }
                return true;
            }
        }
        // add all non-excluded log files
        List<File> files = new ArrayList<File>();
        File[] fileArray = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));
        if (fileArray != null) {
            files.addAll(Arrays.asList(fileArray));
        }

        // add all non-excluded snapshot files to the deletion list
        fileArray = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));
        if (fileArray != null) {
            files.addAll(Arrays.asList(fileArray));
        }

        // remove the old files
        for(File f: files)
        {
            final String msg = "Removing file: "+
                DateFormat.getDateTimeInstance().format(f.lastModified())+
                "\t"+f.getPath();
            LOG.info(msg);
            System.out.println(msg);
            if(!f.delete()){
                System.err.println("Failed to remove "+f.getPath());
            }
        }

    }

先獲取到那些需要保留的文件,之后再去刪除這些不在保留文件之內(nèi)的文件秸滴。

2.3
判斷集群是單機(jī)啟動(dòng)還是集群?jiǎn)?dòng)武契,集群走runFromConfig(config),單機(jī)走ZooKeeperServerMain.main(args)(其實(shí)單機(jī)版最終走的是ZooKeeperServerMainrunFromConfig,)

runFromConfig方法:

    public void runFromConfig(ServerConfig config) throws IOException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            final ZooKeeperServer zkServer = new ZooKeeperServer();
            // Registers shutdown handler which will be used to know the
            // server error or shutdown state changes.
            final CountDownLatch shutdownLatch = new CountDownLatch(1);
            zkServer.registerServerShutdownHandler(
                    new ZooKeeperServerShutdownHandler(shutdownLatch));
            // 構(gòu)建FileTxnSnapLog對(duì)象
            txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                    config.dataDir));
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(config.tickTime);
            zkServer.setMinSessionTimeout(config.minSessionTimeout);
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
            // 構(gòu)建與client之間的網(wǎng)絡(luò)通信服務(wù)組件
            // 這里可以通過(guò)zookeeper.serverCnxnFactory配置NIO還是Netty
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);
            // Watch status of ZooKeeper server. It will do a graceful shutdown
            // if the server is not running or hits an internal error.
            shutdownLatch.await();
            shutdown();

            cnxnFactory.join();
            if (zkServer.canShutdown()) {
                zkServer.shutdown(true);
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }

初始化網(wǎng)絡(luò)服務(wù)組件后咒唆,cnxnFactory.startup(zkServer);
這里以默認(rèn)網(wǎng)絡(luò)服務(wù)組件為例NIOServerCnxnFactory

    @Override
    public void start() {
        // ensure thread is started once and only once
        if (thread.getState() == Thread.State.NEW) {
            thread.start();
        }
    }

    @Override
    public void startup(ZooKeeperServer zks) throws IOException,
            InterruptedException {
        //調(diào)用上面的start方法,實(shí)際調(diào)用thread的start
        //也就調(diào)用了該類(lèi)的run方法,啟動(dòng)網(wǎng)絡(luò)服務(wù)
        start();
        //這是ZooKeeperServer
        setZooKeeperServer(zks);
        zks.startdata();
        zks.startup();
    }

2.4
zks.startdata()方法:

    public void startdata() 
    throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }  
        if (!zkDb.isInitialized()) {
            //loadData進(jìn)行初始化
            loadData();
        }
    }

ZKDatabase在內(nèi)存中維護(hù)了zookeeper的sessions届垫, datatree和commit logs集合。 當(dāng)zookeeper server啟動(dòng)的時(shí)候會(huì)將txnlogs和snapshots從磁盤(pán)讀取到內(nèi)存中

ZKDatabase的loadData()方法:

        //如果zkDb已經(jīng)初始化,設(shè)置zxid(內(nèi)存當(dāng)中DataTree最新的zxid)
        if(zkDb.isInitialized()){
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else {
            // 沒(méi)有初始化,就loadDataBase
            setZxid(zkDb.loadDataBase());
        }

      public long loadDataBase() throws IOException {
          long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
          initialized = true;
          return zxid;
      }

loadDataBase()內(nèi)部調(diào)用的是FileTxnSnapLogrestore方法

2.5
zks.startup()方法:

    public synchronized void startup() {
        if (sessionTracker == null) {
            // 創(chuàng)建會(huì)話管理器
            createSessionTracker();
        }
        // 啟動(dòng)會(huì)話管理器
        startSessionTracker();
        // 設(shè)置請(qǐng)求處理器
        setupRequestProcessors();
        // 注冊(cè)jmx
        registerJMX();

        setState(State.RUNNING);
        notifyAll();
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末全释,一起剝皮案震驚了整個(gè)濱河市装处,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浸船,老刑警劉巖妄迁,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異李命,居然都是意外死亡登淘,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)项戴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)形帮,“玉大人,你說(shuō)我怎么就攤上這事周叮。” “怎么了界斜?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵仿耽,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我各薇,道長(zhǎng)项贺,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任峭判,我火速辦了婚禮开缎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘林螃。我一直安慰自己奕删,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布疗认。 她就那樣靜靜地躺著完残,像睡著了一般。 火紅的嫁衣襯著肌膚如雪横漏。 梳的紋絲不亂的頭發(fā)上谨设,一...
    開(kāi)封第一講書(shū)人閱讀 52,475評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音缎浇,去河邊找鬼扎拣。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的二蓝。 我是一名探鬼主播尊蚁,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼侣夷!你這毒婦竟也來(lái)了横朋?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤百拓,失蹤者是張志新(化名)和其女友劉穎琴锭,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體衙传,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡决帖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蓖捶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片地回。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖俊鱼,靈堂內(nèi)的尸體忽然破棺而出刻像,到底是詐尸還是另有隱情,我是刑警寧澤并闲,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布细睡,位于F島的核電站,受9級(jí)特大地震影響帝火,放射性物質(zhì)發(fā)生泄漏溜徙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一犀填、第九天 我趴在偏房一處隱蔽的房頂上張望蠢壹。 院中可真熱鬧,春花似錦九巡、人聲如沸图贸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)求妹。三九已至,卻和暖如春佳窑,著一層夾襖步出監(jiān)牢的瞬間制恍,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工神凑, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留净神,地道東北人何吝。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像鹃唯,于是被迫代替她去往敵國(guó)和親爱榕。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容