ZooKeeper服務(wù)端啟動(dòng)源碼 集群

集群和單機(jī)版啟動(dòng)類都是QuorumPeerMain穷绵,進(jìn)入initializeAndRun方法

啟動(dòng)

  1. 解析配置文件zoo.cfg
  2. 創(chuàng)建并啟動(dòng)歷史文件清理器DatadirCleanupManager
  3. 根據(jù)集群模式還是單機(jī)模式的啟動(dòng)
if (args.length == 1 && config.servers.size() > 0) {
  // 集群
  runFromConfig(config);
} else {
  ZooKeeperServerMain.main(args);
}

集群模式會(huì)進(jìn)入if塊

初始化

運(yùn)行runFromConfig方法杉辙,在runFromConfig方法內(nèi)部可以看到挑秉,其核心實(shí)例是QuorumPeer,而不再是單機(jī)模式的ZooKeeperServer實(shí)例勋磕,QuorumPeer實(shí)例可以看作是集群的一個(gè)節(jié)點(diǎn)妈候,集群中的所有的QuorumPeer實(shí)例協(xié)作完成集群的選舉、投票挂滓。

  1. 創(chuàng)建并配置ServerCnxnFactory苦银,和單機(jī)版一致。

    ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
    

    cnxnFactory會(huì)賦值給quorumPeerquorumPeer.setCnxnFactory(cnxnFactory);

  2. 實(shí)例化quorumPeer并設(shè)值

    quorumPeer = getQuorumPeer();
    // 設(shè)置集群所有的peer赶站,集群機(jī)器之間互相通信
    quorumPeer.setQuorumPeers(config.getServers());
    ...
    

    這個(gè)就是根據(jù)配置中server.id解析出來的幔虏,如

    server.1=localhost:2888:3888
    server.2=localhost:2887:3887
    server.3=localhost:2886:3886
    
  3. 創(chuàng)建持久化文件管理器FileTxnSnapLog,并給quorumPeer賦值

    quorumPeer.setTxnFactory(new FileTxnSnapLog(
            new File(config.getDataLogDir()),
            new File(config.getDataDir())));
    
  4. 創(chuàng)建內(nèi)存數(shù)據(jù)庫(kù)贝椿,并賦值給quorumPeer

    quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
    
  5. 初始化并啟動(dòng)quorumPeer

    quorumPeer.initialize();
    quorumPeer.start();
    quorumPeer.join();
    

    QuorumPeer#start方法

    //QuorumPeer#start
    public synchronized void start() {
      loadDataBase();
      cnxnFactory.start();        
      startLeaderElection();
      super.start();
    }
    

    啟動(dòng)quorumPeer步驟有

    • 加載內(nèi)存數(shù)據(jù)庫(kù)
    • 啟動(dòng)cnxnFactory想括,客戶端連接的IO線程
    • 集群選舉
    • 選舉線程啟動(dòng)
    1. 集群版加載內(nèi)存數(shù)據(jù)庫(kù)會(huì)去分析當(dāng)前的Epoch
    private long acceptedEpoch = -1;
    private long currentEpoch = -1;
    
    1. 啟動(dòng)cnxnFactory后,這時(shí)候客戶端IO線程是沒法工作的烙博,因?yàn)樵趧?chuàng)建客戶端連接的時(shí)候需要zkServer變量瑟蜈,處理調(diào)用鏈

      protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) {
        return new NIOServerCnxn(zkServer, sock, sk, this);
      }
      

      需要等集群選舉完成、數(shù)據(jù)同步完成后渣窜,為其賦值铺根,才能開啟工作

    所以先主要分析集群選舉和選舉線程啟動(dòng)

集群選舉

集群選舉需要當(dāng)前peer與其他機(jī)器在選舉端口上建立連接,然后發(fā)送投票進(jìn)行選舉乔宿,選舉端口在配置文件中配置

server.id - This is the host:port[:port] that the server with the given id will use for the quorum protocol.

其中夷都,第一個(gè)端口用于指定Follower服務(wù)器與Leader進(jìn)行運(yùn)行時(shí)通信和數(shù)據(jù)同步時(shí)所使用的端口,第二個(gè)端口則專門用于進(jìn)行Leader選舉過程中的投票通信予颤,在初始化時(shí)``quorumPeer`為其賦值囤官。

  1. 初始化投票
    QuorumPeer#startLeaderElection方法初始化投票

    • 創(chuàng)建當(dāng)前投票,優(yōu)先給自己投票
      currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());

    • 創(chuàng)建選舉算法蛤虐,默認(rèn)electionType=3党饮,也就是FastLeaderElection

      // QuorumPeer#createElectionAlgorithm
      case 3:
        qcm = createCnxnManager();
         // 監(jiān)聽連接
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null) {
          listener.start();
          le = new FastLeaderElection(this, qcm);
        }
      

      創(chuàng)建Leader選舉所需的網(wǎng)絡(luò)IO層QuorumCnxManager,同時(shí)啟動(dòng)對(duì)Leader選舉端口的監(jiān)聽驳庭,等待集群中其他服務(wù)器創(chuàng)建連接刑顺。

調(diào)用start方法啟動(dòng)線程也颤,進(jìn)入run方法

  1. 注冊(cè)JMX服務(wù)

    jmxQuorumBean = new QuorumBean(this);
    MBeanRegistry.getInstance().register(jmxQuorumBean, null);
    ...
    
  2. 檢測(cè)當(dāng)前服務(wù)器狀態(tài)蜈漓,并根據(jù)當(dāng)前狀態(tài)做處理

    switch (getPeerState()) {
      case LOOKING:
        ...
      case OBSERVING:
        ...
      case FOLLOWING:
        ...
      case LEADING:
         ...
    }
    

    集群?jiǎn)?dòng)狀態(tài)當(dāng)然是LOOKING

    private ServerState state = ServerState.LOOKING;
    

    LOOKING狀態(tài)的機(jī)器需要去獲取集群的Leader,如果當(dāng)前沒有Leader或详,則進(jìn)入選舉模式贝淤。

    setCurrentVote(makeLEStrategy().lookForLeader());
    
  3. Leader選舉
    選舉算法以默認(rèn)的FastLeaderElection#lookForLeader為例柒竞,該方法開始新一輪Leader選舉。每當(dāng)QuorumPeer將其狀態(tài)更改為L(zhǎng)OOKING時(shí)播聪,就會(huì)調(diào)用此方法朽基,并向所有其他peers發(fā)送通知。具體選舉算法單獨(dú)分析离陶。

  4. 完成選舉后服務(wù)器狀態(tài)為:OBSERVING稼虎、FOLLOWINGLEADING招刨,對(duì)應(yīng)角色分別是Observer霎俩、FollowerLeader沉眶,ObserverFollower的區(qū)別在于Observer不參與任何投票打却。

角色交互

完成集群選舉后,集群Leader和Followers之間需要進(jìn)行數(shù)據(jù)同步沦寂,并在后續(xù)的消息處理中学密,F(xiàn)ollowers會(huì)將事物請(qǐng)求以Request的形式轉(zhuǎn)發(fā)給Leader。

Follower

當(dāng)節(jié)點(diǎn)中狀態(tài)為FOLLOWING時(shí)传藏,將設(shè)置當(dāng)前角色為Follower腻暮,包括創(chuàng)建Follower并啟動(dòng)

setFollower(makeFollower(logFactory));
follower.followLeader();

Follower#followLeader方法

void followLeader() throws InterruptedException {
  ...
  QuorumServer leaderServer = findLeader();            
  try {
    connectToLeader(leaderServer.addr, leaderServer.hostname);
    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

    // leader zxid比自己的zxid還要小,出錯(cuò)了
    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
    if (newEpoch < self.getAcceptedEpoch()) {
      LOG.error("");
      throw new IOException("Error: Epoch of leader is lower");
    }
    syncWithLeader(newEpochZxid);                
    QuorumPacket qp = new QuorumPacket();
    while (this.isRunning()) {
      readPacket(qp);
      processPacket(qp);
    }
  }
  ...
}

步驟

  1. 找到當(dāng)前l(fā)eader毯侦,通過投票查找

    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
      if (s.id == current.getId()) {
        s.recreateSocketAddresses();
        leaderServer = s;
        break;
      }
    }
    
  2. 連接到leader哭靖,重試連接上一步找到的leader

    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
      sock.connect(addr, self.tickTime * self.syncLimit);
    }
    
  3. 向leader注冊(cè),
    這一步Follower向Leader同步投票的Epoch以及Follower的自己的最新事務(wù)id侈离、Epoch试幽,并接受Leader的Epoch。

  4. 同步數(shù)據(jù)
    上一步Leader收到Follower最新的zxid后卦碾,根據(jù)自己的zxid決定采用哪種方式同步數(shù)據(jù)铺坞。在Learner#syncWithLeader方法中起宽,Leader通知Follower以何種方式進(jìn)行同步

    readPacket(qp);
    if (qp.getType() == Leader.DIFF) {
     // 差異化同步
      snapshotNeeded = false;
    } else if (qp.getType() == Leader.SNAP) {
      // 全量同步
      zk.getZKDatabase().clear();
      zk.getZKDatabase().deserializeSnapshot(leaderIs);
      String signature = leaderIs.readString("signature");
      if (!signature.equals("BenWasHere")) {
        LOG.error("Missing signature. Got " + signature);
        throw new IOException("Missing signature");                   
      }
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    } else if (qp.getType() == Leader.TRUNC) {
      //截?cái)嗳罩?  boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
      if (!truncated) {
        System.exit(13);
      }
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    } else {
      System.exit(13);
    }
    

    Follower根據(jù)同步類型,處理本地日志文件及本地?cái)?shù)據(jù)庫(kù)

    • DIFF:差異化同步
    • SNAP:全量同步
    • TRUNC:截?cái)嗳罩?/li>

    然后Leader開始發(fā)送數(shù)據(jù)同步

    // 數(shù)據(jù)同步知道接收到UPTODATE類型的數(shù)據(jù)包結(jié)束
    outerLoop:
    while (self.isRunning()) {
      readPacket(qp);
      switch(qp.getType()) {
        // 投票
        case Leader.PROPOSAL:
          PacketInFlight pif = new PacketInFlight();
          pif.hdr = new TxnHeader();
          pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
          if (pif.hdr.getZxid() != lastQueued + 1) {
            
          }
          lastQueued = pif.hdr.getZxid();
          packetsNotCommitted.add(pif);
          break;
        // 提交
        case Leader.COMMIT:
          if (!writeToTxnLog) {
            pif = packetsNotCommitted.peekFirst();
            if (pif.hdr.getZxid() != qp.getZxid()) {
              LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
            } else {
              zk.processTxn(pif.hdr, pif.rec);
              packetsNotCommitted.remove();
            }
          } else {
            packetsCommitted.add(qp.getZxid());
          }
          break;
        // 只有observer才能得到這種類型的包济榨。我們將此視為接收PROPOSAL和COMMIT坯沪。
        case Leader.INFORM:
          PacketInFlight packet = new PacketInFlight();
          packet.hdr = new TxnHeader();
          packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
          // Log warning message if txn comes out-of-order
          if (packet.hdr.getZxid() != lastQueued + 1) {
            
          }
          lastQueued = packet.hdr.getZxid();
          if (!writeToTxnLog) {
            // Apply to db directly if we haven't taken the snapshot
            zk.processTxn(packet.hdr, packet.rec);
          } else {
            packetsNotCommitted.add(packet);
            packetsCommitted.add(qp.getZxid());
          }
          break;
        // 同步完成
        case Leader.UPTODATE:
          if (isPreZAB1_0) {
            zk.takeSnapshot();
            self.setCurrentEpoch(newEpoch);
          }
          self.cnxnFactory.setZooKeeperServer(zk);                
          break outerLoop;
        case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
          File updating = new File(self.getTxnFactory().getSnapDir(),
                                   QuorumPeer.UPDATING_EPOCH_FILENAME);
          if (!updating.exists() && !updating.createNewFile()) {
            throw new IOException("Failed to create " + updating.toString());
          }
          if (snapshotNeeded) {
            zk.takeSnapshot();
          }
          self.setCurrentEpoch(newEpoch);
          if (!updating.delete()) {
            throw new IOException("Failed to delete " + updating.toString());
          }
          //需要將數(shù)據(jù)寫入事務(wù)日志
          writeToTxnLog = true;
          isPreZAB1_0 = false;
          writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
          break;
      }
    }
    

    同步完成后

    發(fā)送響應(yīng)

    ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
    writePacket(ack, true);
    

    開始接收客戶端請(qǐng)求,這個(gè)zk在不同角色的節(jié)點(diǎn)上是不同的角色擒滑,FollowerZooKeeperServer腐晾、ObserverZooKeeperServer

    zk.startup();
    

    還需要補(bǔ)充內(nèi)存數(shù)據(jù)庫(kù)中snapshot與log之間的差異

  5. 不斷與Leader通信,同步數(shù)據(jù)

    while (this.isRunning()) {
      readPacket(qp);
      processPacket(qp);
    }
    

    Follower#processPacket方法檢查在qp中接收的數(shù)據(jù)包丐一,并根據(jù)其內(nèi)容進(jìn)行分發(fā)藻糖。

    protected void processPacket(QuorumPacket qp) throws IOException{
      switch (qp.getType()) {
         // 心跳
        case Leader.PING:            
          ping(qp);            
          break;
        // 事務(wù)投票
        case Leader.PROPOSAL:            
          TxnHeader hdr = new TxnHeader();
          Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
          if (hdr.getZxid() != lastQueued + 1) {
          }
          lastQueued = hdr.getZxid();
          fzk.logRequest(hdr, txn);
          break;
        // 提交事物
        case Leader.COMMIT:
          fzk.commit(qp.getZxid());
          break;
        case Leader.UPTODATE:
          LOG.error("Received an UPTODATE message after Follower started");
          break;
        case Leader.REVALIDATE:
          revalidate(qp);
          break;
        // 通知Learner服務(wù)器已經(jīng)完成了Sync操作
        case Leader.SYNC:
          fzk.sync();
          break;
        default:
          LOG.error("Invalid packet type: {} received by Observer", qp.getType());
      }
    }
    

    Follower后續(xù)還需要不斷與Leader通信,進(jìn)行事務(wù)投票库车。

至此Follower開始對(duì)外提供服務(wù)巨柒。

Leader

Follower類似,

setLeader(makeLeader(logFactory));
leader.lead();

QuorumPeer#makeLeader方法凝颇,

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
  return new Leader(this, new LeaderZooKeeperServer(logFactory,
                    this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}

Leader內(nèi)部處理請(qǐng)求的是LeaderZooKeeperServer

Leader#lead的主要流程

  1. 加載內(nèi)存數(shù)據(jù)庫(kù)

    zk.loadData();
    
  2. 創(chuàng)建LearnerCnxAcceptor潘拱,啟動(dòng)等待來自新followers的連接請(qǐng)求的線程。

    cnxAcceptor = new LearnerCnxAcceptor();
    cnxAcceptor.start();
    

    Leader.LearnerCnxAcceptor#run方法中

    Socket s = ss.accept();
    // start with the initLimit, once the ack is processed
    // in LearnerHandler switch to the syncLimit
    s.setSoTimeout(self.tickTime * self.initLimit);
    s.setTcpNoDelay(nodelay);
    BufferedInputStream is = new BufferedInputStream(s.getInputStream());
    // 為每個(gè)Learner創(chuàng)建一條線程拧略,處理投票芦岂、數(shù)據(jù)同步
    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
    fh.start();
    
  3. 等待Leaner響應(yīng)Ack

    readyToStart = true;
    long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
    
    zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
    
    synchronized(this){
      lastProposed = zk.getZxid();
    }
    
    newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
    
    waitForEpochAck(self.getId(), leaderStateSummary);
    self.setCurrentEpoch(epoch);
    
    waitForNewLeaderAck(self.getId(), zk.getZxid());
    

    準(zhǔn)備完畢,只需要等待過半數(shù)的Leaner的回復(fù)即可對(duì)外工作垫蛆,在LeanerHandler中也會(huì)調(diào)用waitForEpochAck禽最、waitForEpochAck喚醒Leader

  4. 對(duì)外提供服務(wù)

    startZkServer();
    
  5. 心跳,和Leaner备し梗活

至此ZooKeeper集群模式啟動(dòng)完畢川无,整個(gè)集群開始對(duì)外提供服務(wù)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末虑乖,一起剝皮案震驚了整個(gè)濱河市懦趋,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌疹味,老刑警劉巖仅叫,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異糙捺,居然都是意外死亡诫咱,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門洪灯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來坎缭,“玉大人,你說我怎么就攤上這事√秃簦” “怎么了坏快?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)哄尔。 經(jīng)常有香客問我假消,道長(zhǎng),這世上最難降的妖魔是什么岭接? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮臼予,結(jié)果婚禮上鸣戴,老公的妹妹穿的比我還像新娘。我一直安慰自己粘拾,他們只是感情好窄锅,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著缰雇,像睡著了一般入偷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上械哟,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天疏之,我揣著相機(jī)與錄音,去河邊找鬼暇咆。 笑死锋爪,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的爸业。 我是一名探鬼主播其骄,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼扯旷!你這毒婦竟也來了拯爽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤钧忽,失蹤者是張志新(化名)和其女友劉穎毯炮,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體惰瓜,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡否副,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了崎坊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片备禀。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出曲尸,到底是詐尸還是另有隱情赋续,我是刑警寧澤,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布另患,位于F島的核電站纽乱,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏昆箕。R本人自食惡果不足惜鸦列,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鹏倘。 院中可真熱鬧薯嗤,春花似錦、人聲如沸纤泵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽捏题。三九已至玻褪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間公荧,已是汗流浹背带射。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留稚矿,地道東北人庸诱。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像晤揣,于是被迫代替她去往敵國(guó)和親桥爽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容