ZooKeeper服務端啟動源碼 單機

從服務端啟動腳本可以看到啟動類為org.apache.zookeeper.server.quorum.QuorumPeerMain,在這個類注釋了關于啟動程序的配置文件

第一個參數(shù)為配置文件冗茸,用于獲取配置信息联喘。該文件是一個屬性文件索昂,因此鍵和值由equals(=)分隔,鍵/值對由換行分隔。下面是配置文件中使用的鍵的一般摘要契耿。

  • dataDir - The directory where the ZooKeeper data is stored.
  • dataLogDir - The directory where the ZooKeeper transaction log is stored.
  • clientPort - The port used to communicate with clients.
  • tickTime - The duration of a tick in milliseconds. This is the basic unit of time in ZooKeeper.
  • initLimit - The maximum number of ticks that a follower will wait to initially synchronize with a leader.
  • syncLimit - The maximum number of ticks that a follower will wait for a message (including heartbeats) from the leader.
  • server.id - This is the host:port[:port] that the server with the given id will use for the quorum protocol.

除了配置文件。數(shù)據(jù)目錄中有一個名為“myid”的文件螃征,其中包含服務器id作為ASCII十進制值搪桂。

public static void main(String[] args) {
  QuorumPeerMain main = new QuorumPeerMain();
  main.initializeAndRun(args);
}

initializeAndRun方法中初始化并啟動

protected void initializeAndRun(String[] args) {
  QuorumPeerConfig config = new QuorumPeerConfig();
  if (args.length == 1) {
        //解析配置文件zoo.cfg
    config.parse(args[0]);
  }

  // 創(chuàng)建并啟動歷史文件清理器DatadirCleanupManager
  DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
        .getDataDir(), config.getDataLogDir(), config
        .getSnapRetainCount(), config.getPurgeInterval());
  purgeMgr.start();
    // 判斷是分布式還是單機
  if (args.length == 1 && config.isDistributed()) {
    runFromConfig(config);
  } else {
    LOG.warn("。。");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);
  }
}
  1. 解析配置文件zoo.cfg
  2. 創(chuàng)建并啟動歷史文件清理器DatadirCleanupManager
  3. 判斷當前是集群模式還是單機模式的啟動

先分析單機模式踢械,調用ZooKeeperServerMain.main(args);酗电,這個類啟動并運行一個獨立的ZooKeeperServer。

  1. 注冊MBean

    ManagedUtil.registerLog4jMBeans();
    
  2. 解析ServerConfig

    ServerConfig config = new ServerConfig();
    config.parse(args[0]);
    
  3. 啟動

    runFromConfig(config);
    

進入runFromConfig方法后

  1. 創(chuàng)建ZooKeeperServer實例

    final ZooKeeperServer zkServer = new ZooKeeperServer();
    
  2. 實例化FileTxnSnapLog内列,txnlog和snapshot的助手類

 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
  1. 設置zkServer屬性

    zkServer.setTxnLogFactory(txnLog);
    zkServer.setTickTime(config.tickTime);
    zkServer.setMinSessionTimeout(config.minSessionTimeout);
    zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
    
  2. 創(chuàng)建cnxnFactory撵术,配置并啟動,默認為NIOServerCnxnFactory

    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
    cnxnFactory.startup(zkServer);
    

    configure配置連接工廠

    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
      configureSaslLogin();
    
      thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
      thread.setDaemon(true);
      maxClientCnxns = maxcc;
      this.ss = ServerSocketChannel.open();
      ss.socket().setReuseAddress(true);
      LOG.info("binding to port " + addr);
      ss.socket().bind(addr);
      ss.configureBlocking(false);
      ss.register(selector, SelectionKey.OP_ACCEPT);
    }
    
    1. 創(chuàng)建一個后臺線程话瞧,傳參為this嫩与,也就是說thread線程將執(zhí)行cnxnFactory的方法
    2. 打開ServerSocketChannel,監(jiān)聽端口
  3. 主線程阻塞并等待信號交排,優(yōu)雅關機

    shutdownLatch.await();
    shutdown();
    //主線程等待IO線程執(zhí)行完成
    cnxnFactory.join();
    if (zkServer.canShutdown()) {
        zkServer.shutdown(true);
    }
    

第8步并不是主流程划滋,而是等待shutdownLatch的信號,執(zhí)行關機操作

  1. 進入NIOServerCnxnFactory的startup方法

    start();
    setZooKeeperServer(zks);
    zks.startdata();
    zks.startup();
    
  2. start方法啟動接收連接的線程

thread.start();

在第7步說了埃篓,該thread將執(zhí)行cnxnFactory的run方法处坪,run方法內部就是接收連接、處理新連接架专,也是常規(guī)的NIO操作

for (SelectionKey k : selectedList) {
  if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
    SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
    InetAddress ia = sc.socket().getInetAddress();
    int cnxncount = getClientCnxnCount(ia);
    // 1 這個是對同一個ip限制最大連接樹同窘,配置文件可配
    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
      LOG.warn("Too many connections from " + ia + " - max is " + maxClientCnxns );
      sc.close();
    } else {
      LOG.info("Accepted socket connection from " + sc.socket().getRemoteSocketAddress());
      sc.configureBlocking(false);
      SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
      // 2 為每個客戶端channel創(chuàng)建一個NIOServerCnxn,并把cnxn放到SelectionKey的attachment上
      NIOServerCnxn cnxn = createConnection(sc, sk);
      sk.attach(cnxn);
      addCnxn(cnxn);
    }
  } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
    // 3 讀寫部脚,取出attachment想邦,處理IO
    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
    c.doIO(k);
  } else {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Unexpected ops in select " + k.readyOps());
    }
  }
}
selected.clear();
  1. 新連接對同一個ip限制最大連接樹,配置文件可配
  2. 新連接為每個客戶端channel創(chuàng)建一個NIOServerCnxn睛低,并把cnxn放到SelectionKey的attachment上
  3. 讀寫操作案狠,取出上面的attachment,也就是NIOServerCnxn钱雷,處理IO

關于請求處理流程這里不分析

  1. 回到第9步骂铁,zks.startdata();恢復數(shù)據(jù),重點關注一下
    內部調用loadData方法罩抗,恢復session和節(jié)點數(shù)據(jù)

    public void loadData() {
      // 設置Zxid拉庵,這個在集群選舉的時候很重要
      if(zkDb.isInitialized()){
        setZxid(zkDb.getDataTreeLastProcessedZxid());
      } else {
        // 未初始化,加載數(shù)據(jù)并設置Zxid
        setZxid(zkDb.loadDataBase());
      }
    
      // 清理過期session
      LinkedList<Long> deadSessions = new LinkedList<Long>();
      for (Long session : zkDb.getSessions()) {
        if (zkDb.getSessionWithTimeOuts().get(session) == null) {
          deadSessions.add(session);
        }
      }
      zkDb.setDataTreeInit(true);
      for (long session : deadSessions) {
        // XXX: Is lastProcessedZxid really the best thing to use?
        killSession(session, zkDb.getDataTreeLastProcessedZxid());
      }
    }
    

    ZKDatabase#loadDataBase方法內部通過調用FileTxnSnapLog#restore恢復數(shù)據(jù)套蒂,這里面有一些zk關于數(shù)據(jù)存儲和恢復的機制需要關注下钞支。

    /**
    * 這個函數(shù)在讀取快照和事務日志之后恢復服務器數(shù)據(jù)庫
    */
    public long restore(DataTree dt, Map<Long, Integer> sessions, 
                        PlayBackListener listener) throws IOException {
      // 1
      snapLog.deserialize(dt, sessions);
      // 2
      return fastForwardFromEdits(dt, sessions, listener);
    }
    
    1. 從最后一個有效快照反序列化數(shù)據(jù)樹,并返回最后一個反序列化的zxid
    2. 對服務器數(shù)據(jù)庫進行快進處理操刀,使其包含最新的事務烁挟。這與恢復相同,但只從事務日志中讀取骨坑,而不從快照中恢復撼嗓。

    簡單來說就是先通過snapshot恢復數(shù)據(jù)柬采,然后從log中補充缺失的數(shù)據(jù)。

    進到FileSnap#deserialize方法

    public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
      // 查找最新的且警、有效的100個snapshot文件
      List<File> snapList = findNValidSnapshots(100);
      if (snapList.size() == 0) {
        return -1L;
      }
      File snap = null;
      boolean foundValid = false;
      for (int i = 0; i < snapList.size(); i++) {
        snap = snapList.get(i);
        InputStream snapIS = null;
        CheckedInputStream crcIn = null;
        try {
          LOG.info("Reading snapshot " + snap);
          snapIS = new BufferedInputStream(new FileInputStream(snap));
          crcIn = new CheckedInputStream(snapIS, new Adler32());
          InputArchive ia = BinaryInputArchive.getArchive(crcIn);
          deserialize(dt,sessions, ia);
          long checkSum = crcIn.getChecksum().getValue();
          long val = ia.readLong("val");
          if (val != checkSum) {
            throw new IOException("CRC corruption in snapshot :  " + snap);
          }
          // 已恢復有效的snapshot文件
          foundValid = true;
          break;
        } catch(IOException e) {
          LOG.warn("problem reading snap file " + snap, e);
        } finally {
          if (snapIS != null) 
            snapIS.close();
          if (crcIn != null) 
            crcIn.close();
        } 
      }
      if (!foundValid) {
        throw new IOException("Not able to find valid snapshots in " + snapDir);
      }
      //Zxid
      dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
      return dt.lastProcessedZxid;
    }
    
    1. snapshot文件的查找通過配置中的dataDir目錄中以snapshot開頭的文件
    2. 100個粉捻?在后續(xù)的恢復代碼中可以看到,snapshot文件可能出錯斑芜,需要從多個文件中重試肩刃,zookeeper可以通過配置定期刪除不需要的、舊的snapshot文件
    3. snapshot文件命名即為snapshot.zxid的形式杏头,根據(jù)snapshot的文件的名稱即可得到最新的zxid

    接下來從log文件恢復snapshot文件缺失的數(shù)據(jù)盈包,FileTxnSnapLog#fastForwardFromEdits方法

    public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                     PlayBackListener listener) throws IOException {
      // 1. log文件
      FileTxnLog txnLog = new FileTxnLog(dataDir);
      // 2 iterator
      TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
      long highestZxid = dt.lastProcessedZxid;
      TxnHeader hdr;
      try {
        while (true) {
          // 迭代到的TxnHeader
          hdr = itr.getHeader();
          if (hdr == null) {
            //empty logs 
            return dt.lastProcessedZxid;
          }
          if (hdr.getZxid() < highestZxid && highestZxid != 0) {
            LOG.error("...");
          } else {
            highestZxid = hdr.getZxid();
          }
          try {
            //3
            processTransaction(hdr,dt,sessions, itr.getTxn());
          } catch(KeeperException.NoNodeException e) {
            throw new IOException("...");
          }
          //4
          listener.onTxnLoaded(hdr, itr.getTxn());
          //5
          if (!itr.next()) 
            break;
        }
      } finally {
        if (itr != null) {
          itr.close();
        }
      }
      return highestZxid;
    }
    
    1. 這個dataDir并不是配置文件中配置的dataDir,而是dataLogDir大州,在ZooKeeperServerMain#runFromConfig方法中续语,創(chuàng)建txnLog時

      txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir));
      // FileTxnSnapLog構造函數(shù)
      public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
        this.dataDir = new File(dataDir, version + VERSION);
        this.snapDir = new File(snapDir, version + VERSION);
        
        txnLog = new FileTxnLog(this.dataDir);
        snapLog = new FileSnap(this.snapDir);
      }
      

      可以看到配置中dataDir是snapLog文件的路徑,而dataLogDir是txnLog的路徑厦画,但dataLogDir不是必須配置的,在QuorumPeerConfig#parseProperties方法滥朱,解析config時

      if (dataDir == null) {
        throw new IllegalArgumentException("dataDir is not set");
      }
      if (dataLogDir == null) {
        dataLogDir = dataDir;
      }
      

      可以看到dataDir是必須配置的根暑,而dataLogDir不配置時會取dataDir

    2. 創(chuàng)建并初始化TxnIterator徙邻,初始化會讀取事務日志中比snapshot的zxid大的日志

      // FileTxnIterator#init
      while (hdr.getZxid() < zxid) {
        if (!next())
          return;
      }
      
    3. 處理本地事務排嫌,保存到dataTree中

    4. 將提議加入到廣播隊列中,等待選舉稱為Leader后廣播給其他Followers缰犁、Leaners淳地,暫不考慮,需要注意的是傳入的listener是調用ZKDatabase#loadDataBase方法傳入的commitProposalPlaybackListener

      private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
        public void onTxnLoaded(TxnHeader hdr, Record txn){
          addCommittedProposal(hdr, txn);
        }
      };
      
    5. 迭代下一個事務操作

    通過讀取snapshot文件+log文件帅容,將內存數(shù)據(jù)庫恢復

  2. 回到第9步颇象,zks.startup();

    public synchronized void startup() {
      //1 會話管理器
      if (sessionTracker == null) {
        createSessionTracker();
      }
      startSessionTracker();
      // 請求處理鏈
      setupRequestProcessors();
    
      // jmx
      registerJMX();
    
      setState(State.RUNNING);
      notifyAll();
    }
    

    startup方法有很多子流程

  3. 創(chuàng)建并啟動會話管理器

    protected void createSessionTracker() {
      sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                                              tickTime, 1, getZooKeeperServerListener());
    }
    protected void startSessionTracker() {
      ((SessionTrackerImpl)sessionTracker).start();
    }
    

    SessionTrackerImpl的構造函數(shù)中

    public SessionTrackerImpl(SessionExpirer expirer,
      ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
      long sid, ZooKeeperServerListener listener) {
      super("SessionTracker", listener);
      this.expirer = expirer;
      this.expirationInterval = tickTime;
      this.sessionsWithTimeout = sessionsWithTimeout;
      nextExpirationTime = roundToInterval(Time.currentElapsedTime());
      this.nextSessionId = initializeNextSession(sid);
      for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
        addSession(e.getKey(), e.getValue());
      }
    }
    
    • 傳入的expirer是zks實例,也就是會調用ZooKeeperServer#expire方法過期session
    • expirationInterval是配置文件中配置的tickTime并徘,作為過期分桶的最小間隔
    • sessionsWithTimeoutsessionId的過期時間
    • nextExpirationTime下次清理過期session的時間遣钳,從SessionTrackerImpl#roundToInterval方法可以看出分桶過期策略
    • nextSessionId分配sessionId時起始值,值為serverId(8) + time(40) + 0(16):高8為為serverId(這里傳入的1)麦乞,中40位為時間蕴茴,底16位遞增
    • 遍歷sessionsWithTimeout調用addSession方法,這個方法內部創(chuàng)建SessionImpl實例作為session的實體姐直,同時調用SessionTrackerImpl#touchSession方法將session分桶倦淀,同樣是調用roundToInterval方法

    啟動:sessionTracker作為線程啟動,后臺定期清理過期數(shù)據(jù)

  4. 設置請求處理鏈

    protected void setupRequestProcessors() {
      RequestProcessor finalProcessor = new FinalRequestProcessor(this);
      RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
      ((SyncRequestProcessor)syncProcessor).start();
      firstProcessor = new PrepRequestProcessor(this, syncProcessor);
      ((PrepRequestProcessor)firstProcessor).start();
    }
    

    單機模式的請求鏈

    PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
    
  5. 注冊JMX服務

那么至此單機版zk就啟動完成了声畏,接下來便是處理請求撞叽、響應請求、觸發(fā)節(jié)點監(jiān)聽等處理了。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末能扒,一起剝皮案震驚了整個濱河市佣渴,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌初斑,老刑警劉巖辛润,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異见秤,居然都是意外死亡砂竖,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門鹃答,熙熙樓的掌柜王于貴愁眉苦臉地迎上來乎澄,“玉大人,你說我怎么就攤上這事测摔≈眉茫” “怎么了?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵锋八,是天一觀的道長浙于。 經(jīng)常有香客問我,道長挟纱,這世上最難降的妖魔是什么羞酗? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮紊服,結果婚禮上檀轨,老公的妹妹穿的比我還像新娘。我一直安慰自己欺嗤,他們只是感情好参萄,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著剂府,像睡著了一般拧揽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上腺占,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天淤袜,我揣著相機與錄音,去河邊找鬼衰伯。 笑死铡羡,一個胖子當著我的面吹牛,可吹牛的內容都是我干的意鲸。 我是一名探鬼主播烦周,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼尽爆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了读慎?” 一聲冷哼從身側響起漱贱,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎夭委,沒想到半個月后幅狮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡株灸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年崇摄,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片慌烧。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡逐抑,死狀恐怖,靈堂內的尸體忽然破棺而出屹蚊,到底是詐尸還是另有隱情厕氨,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布汹粤,位于F島的核電站腐巢,受9級特大地震影響,放射性物質發(fā)生泄漏玄括。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一肉瓦、第九天 我趴在偏房一處隱蔽的房頂上張望遭京。 院中可真熱鬧,春花似錦泞莉、人聲如沸哪雕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽斯嚎。三九已至,卻和暖如春挨厚,著一層夾襖步出監(jiān)牢的瞬間堡僻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工疫剃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留钉疫,地道東北人。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓巢价,卻偏偏與公主長得像牲阁,于是被迫代替她去往敵國和親固阁。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356

推薦閱讀更多精彩內容