zookeeper源碼解讀(一)---選舉

1.在QuorumPeerMain中main方法糙箍,main.initializeAndRun(args),啟動節(jié)點對象课舍。

 protected void initializeAndRun(String[] args) throws ConfigException,IOException
{
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }
    // Start and schedule the the purge task 啟動datadiar的定時清理任務(wù)
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
    //如果配置中有多個zookeeper地址則調(diào)用runFromConfig,否則用 ZooKeeperServerMain.main(args); 單例模式啟動
    if (args.length == 1 && config.servers.size() > 0) {
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args); 
    }
}

2.initializeAndRun方法內(nèi)部是通過調(diào)用方法runFromConfig(QuorumPeerConfig config)將節(jié)點的配置載入他挎,并且啟動節(jié)點筝尾,調(diào)用其start(),并且join到主線程中變成循序執(zhí)行。

public void runFromConfig(QuorumPeerConfig config) throws IOException {
  try {
      ManagedUtil.registerLog4jMBeans();
  } catch (JMException e) {
      LOG.warn("Unable to register log4j JMX control", e);
  }
  LOG.info("Starting quorum peer");
  try {
      ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
      cnxnFactory.configure(config.getClientPortAddress(),
                            config.getMaxClientCnxns());
      quorumPeer = new QuorumPeer();
      quorumPeer.setClientPortAddress(config.getClientPortAddress());
      quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
      quorumPeer.setQuorumPeers(config.getServers());
      quorumPeer.setElectionType(config.getElectionAlg());
      quorumPeer.setMyid(config.getServerId());
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
      quorumPeer.setInitLimit(config.getInitLimit());
      quorumPeer.setSyncLimit(config.getSyncLimit());
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
      quorumPeer.setCnxnFactory(cnxnFactory);
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
      quorumPeer.setLearnerType(config.getPeerType());
      quorumPeer.setSyncEnabled(config.getSyncEnabled());
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
      quorumPeer.start();//啟動節(jié)點
      quorumPeer.join();//join主線程办桨,順序運行
  } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}

3.調(diào)用QuorumPeer類的start()方法啟動節(jié)點筹淫,start方法內(nèi)容如下:

@Override
public synchronized void start() {
    loadDataBase(); //加載數(shù)據(jù)
    cnxnFactory.start(); //啟動本地服務(wù)器 ,有netty和nio種模式
    startLeaderElection(); //開始選舉leader
    super.start(); // 因為QuorumPeer繼承于Thread呢撞,所以調(diào)用了QuorumPeer的run方法
}
//cnxnFactory的start方法使用netty方式時损姜,代碼如下饰剥,綁定localAddress啟動了一個channel
@Override
public void start() {
    LOG.info("binding to port " + localAddress);
    parentChannel = bootstrap.bind(localAddress);
}
startLeaderElection為選舉的方法,使用同步保證每次只有一個
synchronized public void startLeaderElection() {
    try {
        //先生成本地選票
        currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    } catch(IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    //根據(jù)配置文件中quorumPeers節(jié)點摧阅,以及myid汰蓉,來設(shè)置myQuorumAddr,本地節(jié)點的地址
    for (QuorumServer p : getView().values()) {
        if (p.id == myid) {
            myQuorumAddr = p.addr;
            break;
        }
    }
    if (myQuorumAddr == null) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    if (electionType == 0) {
        try {
            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    //生成配置選舉的算法
    this.electionAlg = createElectionAlgorithm(electionType);
}

4.createElectionAlgorithm 生成選舉算法代碼如下:

protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
            
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);//zookeeper 3.4以上@Deprecated
        break;
    case 1:
        le = new AuthFastLeaderElection(this);//zookeeper 3.4以上@Deprecated
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);//zookeeper 3.4以上@Deprecated
        break;
    case 3:
        qcm = new QuorumCnxManager(this);//啟動本節(jié)點上下文管理器
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();//啟動SocketServer接收數(shù)據(jù)
            le = new FastLeaderElection(this, qcm);//生成FLE選舉算法類
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

5.QuorumCnxManager對象主要維護針對各個節(jié)點的連接棒卷,內(nèi)部實現(xiàn)累Listener主要是啟動一個sockerServer負(fù)責(zé)接收顾孽,啟動SockerServer最多三次,RecvWorker接收到的消息比规,SendWorker主要是發(fā)送數(shù)據(jù)若厚。

6.FastLeaderElection 類 是選舉類。主要選舉的過程都在該類中實現(xiàn)蜒什。主要是調(diào)用該類中的starter方法,主要完成了一些初始化工作

private void starter(QuorumPeer self, QuorumCnxManager manager) {
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;
    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);
}

到這里選舉需要的初始化工作基本完成测秸。

7.QuorumPeer類中然后調(diào)用super.start(),即調(diào)用本類的run方法灾常,開始真正的選舉過程乞封。

@Override
public void run() {
    setName("QuorumPeer" + "[myid=" + getId() + "]" +
            cnxnFactory.getLocalAddress());
    LOG.debug("Starting quorum peer");
    try {
        jmxQuorumBean = new QuorumBean(this);
        MBeanRegistry.getInstance().register(jmxQuorumBean, null);
        for(QuorumServer s: getView().values()){
            ZKMBeanInfo p;
            if (getId() == s.id) {
                p = jmxLocalPeerBean = new LocalPeerBean(this);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                    jmxLocalPeerBean = null;
                }
            } else {
                p = new RemotePeerBean(s);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                }
            }
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxQuorumBean = null;
    }
    //以上注冊JMX
    try {
        /*
         * Main loop
         */
         //開始監(jiān)聽節(jié)點的狀態(tài),根據(jù)狀態(tài)調(diào)用相應(yīng)的方法
        while (running) {
            switch (getPeerState()) {
            case LOOKING:
                LOG.info("LOOKING");
                //節(jié)點狀態(tài)是否為只讀
                if (Boolean.getBoolean("readonlymode.enabled")) {
                    //只讀模式暫時先不討論
                    .......省略只讀代碼
                } else {
                    try {
                        setBCVote(null);
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    LOG.info("OBSERVING");
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e );                        
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case LEADING:
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        try {
            MBeanRegistry.getInstance().unregisterAll();
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
    }
}

8.run中調(diào)用makeLEStrategy()返回FLE對象岗憋,調(diào)用其中l(wèi)ookForLeader(),如下

public Vote lookForLeader() throws InterruptedException {
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(
                self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }
    //以上是注冊JMX代碼
    if (self.start_fle == 0) {
       self.start_fle = System.currentTimeMillis();//開始選舉的時間
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();//收到的票
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();//投出的票
        int notTimeout = finalizeWait;

        //同步處理肃晚,logicalclock加1,生成本地的Proposal
        synchronized(this){
            logicalclock++;// 當(dāng)前邏輯時鐘數(shù)加1
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());//本peer的:sid,lastProcessedZxid仔戈,以及currentEpoch关串,在投票時使用getVote的時候會用到,投票主要也是這3個參數(shù)
        }

        LOG.info("New election. My id =  " + self.getId() +
                ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        sendNotifications();//第一次投自己一票监徘,通知各個節(jié)點晋修。

        /*
         * Loop in which we exchange notifications until we find a leader
         */
        //如果一直是LOOKING狀態(tài)即選舉的狀態(tài),則循環(huán)
        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            Notification n = recvqueue.poll(notTimeout,
                    TimeUnit.MILLISECONDS);//從各個節(jié)點獲取過來的信息凰盔。

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            //如果沒有回音墓卦,同時消息已經(jīng)發(fā)送,則重新通知各個節(jié)點户敬,如果還有消息沒發(fā)送完落剪,則重新連接下。
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            }
            else if(self.getVotingView().containsKey(n.sid)) {
                /*
                 * Only proceed if the vote comes from a replica in the
                 * voting view.
                 */
                 //收到別的服務(wù)器的選票
                switch (n.state) {
                case LOOKING:
                    // If notification > current, replace and send messages out
                    //如果收到的選舉輪數(shù)>本地的輪數(shù)(n.electionEpoch > logicalclock)尿庐,將選舉輪數(shù)logicalclock更改為收到的輪數(shù)忠怖,如果收到的選票中的electionEpoch > logicalclock 則更新本地的proposal,最后再次發(fā)起通知各個peer
                    if (n.electionEpoch > logicalclock) {
                        logicalclock = n.electionEpoch;
                        recvset.clear();//清除所有已經(jīng)收到的選票
                        //如果收到的proposal比本地的proposal厲害抄瑟,則更新本地的proposal凡泣,然后則
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();
                    } else if (n.electionEpoch < logicalclock) {//如果選舉輪數(shù)<本地輪數(shù),則不做處理
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {//如過輪數(shù)相等則比較proposal,如果收到的proposal比較新鞋拟,則更新本地proposal骂维,再次發(fā)起通知各個peer
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if(LOG.isDebugEnabled()){
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }
                    //記錄每次收到的投票
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                    //如果當(dāng)前的proposal在當(dāng)前的選舉輪數(shù)已經(jīng)能夠選舉出leader了,termPredicate函數(shù)返回true贺纲。
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock, proposedEpoch))) {

                        // Verify if there is any change in the proposed leader
                        //如果收到proposal有變化,則放入recvqueue航闺,循環(huán)到while頭進行處理
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                         //在已經(jīng)選舉出LEADER的情況下,如果一直沒有消息哮笆,則認(rèn)為選舉結(jié)束来颤,新的LEADER產(chǎn)生。設(shè)置本PEER的狀態(tài)稠肘,通過sid來判斷是否自己為leader福铅,如果為leader則更改狀態(tài)為LEADING,然后,運行LEADING的case项阴,同時設(shè)置endVote滑黔,
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(proposedLeader,
                                                    proposedZxid,
                                                    logicalclock,
                                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if(n.electionEpoch == logicalclock){
                        recvset.put(n.sid, new Vote(n.leader,
                                                      n.zxid,
                                                      n.electionEpoch,
                                                      n.peerEpoch));
                       
                        if(ooePredicate(recvset, outofelection, n)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(n.leader, 
                                    n.zxid, 
                                    n.electionEpoch, 
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                                                        n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch,
                                                        n.state));
       
                    if(ooePredicate(outofelection, outofelection, n)) {
                        synchronized(this){
                            logicalclock = n.electionEpoch;
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                                n.zxid,
                                                n.electionEpoch,
                                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                        self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
    }
}

9.選舉出LEADER之后,分二個环揽,首先如果是LEADER的話略荡,則peer的STATE為LEADING,如果非LEADER,則根據(jù)LearnType的類型分為FOLLOWING和OBSERVING
在QuorumPeer中,狀態(tài)變更關(guān)鍵代碼如下:

{

    case OBSERVING:
        try {
            LOG.info("OBSERVING");
            setObserver(makeObserver(logFactory));
            observer.observeLeader();
        } catch (Exception e) {
            LOG.warn("Unexpected exception",e );                        
        } finally {
            observer.shutdown();
            setObserver(null);
            setPeerState(ServerState.LOOKING);
        }
        break;
    case FOLLOWING:
        try {
            LOG.info("FOLLOWING");
            setFollower(makeFollower(logFactory));
            follower.followLeader();
        } catch (Exception e) {
            LOG.warn("Unexpected exception",e);
        } finally {
            follower.shutdown();
            setFollower(null);
            setPeerState(ServerState.LOOKING);
        }
        break;
    case LEADING:
        LOG.info("LEADING");
        try {
            setLeader(makeLeader(logFactory));
            leader.lead();
            setLeader(null);
        } catch (Exception e) {
            LOG.warn("Unexpected exception",e);
        } finally {
            if (leader != null) {
                leader.shutdown("Forcing shutdown");
                setLeader(null);
            }
            setPeerState(ServerState.LOOKING);
        }
        break;
    }
}

首先我們關(guān)注LEADING狀態(tài)的事件歉胶,setLeader方法是初始化了一個Leader對象汛兜,關(guān)鍵在leader.lead()方法。代碼如下

void lead() throws IOException, InterruptedException {
    self.end_fle = System.currentTimeMillis();
    LOG.info("LEADING - LEADER ELECTION TOOK - " +
          (self.end_fle - self.start_fle));
    self.start_fle = 0;
    self.end_fle = 0;

    zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
    //注冊leader的jmx
    try {
        self.tick = 0;
        zk.loadData();//載入數(shù)據(jù)通今,初始化Zxid粥谬,并且cleanup已經(jīng)dead的session
        //leader的上下文信息
        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // Start thread that waits for connection requests from 
        // new followers.
        //啟動socketserver,這個SocketServer的啟動在setLeader方法中已經(jīng)完成辫塌,接收follower的連接
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();
        
        readyToStart = true;
        //leader將自己的acceptedEpoch進行計算漏策,計算通過以后,判斷是否一半以上的follower已經(jīng)上傳了acceptedEpoch臼氨,如果還沒到一般掺喻,leader會進入wait狀態(tài)。
        //同時follower通過learn對象中registerWithLeader方法储矩,將follower的AcceptedEpoch發(fā)送給leader感耙。
        //在learnHandler里面做處理獲取follower通過zxid高32為作為follower的epoch。
        //使用epoch+1作為新的epoch
        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
        
        //確定epoch以后椰苟,向左移32抑月,作為最新的zxid
        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
        //設(shè)置lastProposed
        synchronized(this){
            lastProposed = zk.getZxid();
        }
        //生成NEWLEADER的packet,內(nèi)容為最新的zxid舆蝴,發(fā)送給follower
        newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                null, null);


        if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
            LOG.info("NEWLEADER proposal has Zxid of "
                    + Long.toHexString(newLeaderProposal.packet.getZxid()));
        }
        //發(fā)送給follower最新的zxid以后,等待follower的epochAck確認(rèn),這里線程會停止wait洁仗,具體看learn和learnhandler之間的交互层皱,在learnhandler中,根據(jù)follower的epochack赠潦,會有一個notifyAll()方法叫胖。也是判斷一半的follower已經(jīng)epochAck了。處理每個follower的learnHandler都會調(diào)用leader.waitForEpochAck()她奥;進入等待瓮增,直到其中一個learnhandler判斷發(fā)現(xiàn)過半,通過notifyAll()喚醒所有哩俭。
        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);

        // We have to get at least a majority of servers in sync with
        // us. We do this by waiting for the NEWLEADER packet to get
        // acknowledged
        try {
            //在發(fā)送NewLeader事件后绷跑,等待follower的NewLeaderAck事件,也需要過半凡资。和waitEpochAck類似
            waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
        } catch (InterruptedException e) {
            shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                    + getSidSetString(newLeaderProposal.ackSet) + " ]");
            HashSet<Long> followerSet = new HashSet<Long>();
            for (LearnerHandler f : learners)
                followerSet.add(f.getSid());
                
            if (self.getQuorumVerifier().containsQuorum(followerSet)) {
                LOG.warn("Enough followers present. "
                        + "Perhaps the initTicks need to be increased.");
            }
            Thread.sleep(self.tickTime);
            self.tick++;
            return;
        }
        //啟動
        startZkServer();
        
        /**
         * WARNING: do not use this for anything other than QA testing
         * on a real cluster. Specifically to enable verification that quorum
         * can handle the lower 32bit roll-over issue identified in
         * ZOOKEEPER-1277. Without this option it would take a very long
         * time (on order of a month say) to see the 4 billion writes
         * necessary to cause the roll-over to occur.
         * 
         * This field allows you to override the zxid of the server. Typically
         * you'll want to set it to something like 0xfffffff0 and then
         * start the quorum, run some operations and see the re-election.
         */
        String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
        if (initialZxid != null) {
            long zxid = Long.parseLong(initialZxid);
            zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
        }
        
        if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
            self.cnxnFactory.setZooKeeperServer(zk);
        }
        // Everything is a go, simply start counting the ticks
        // WARNING: I couldn't find any wait statement on a synchronized
        // block that would be notified by this notifyAll() call, so
        // I commented it out
        //synchronized (this) {
        //    notifyAll();
        //}
        // We ping twice a tick, so we only update the tick every other
        // iteration
        boolean tickSkip = true;

        while (true) {
            Thread.sleep(self.tickTime / 2);
            if (!tickSkip) {
                self.tick++;
            }
            HashSet<Long> syncedSet = new HashSet<Long>();

            // lock on the followers when we use it.
            syncedSet.add(self.getId());

            for (LearnerHandler f : getLearners()) {
                // Synced set is used to check we have a supporting quorum, so only
                // PARTICIPANT, not OBSERVER, learners should be used
                if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                    syncedSet.add(f.getSid());
                }
                f.ping();
            }

          if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
            //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                // Lost quorum, shutdown
                shutdown("Not sufficient followers synced, only synced with sids: [ "
                        + getSidSetString(syncedSet) + " ]");
                // make sure the order is the same!
                // the leader goes to looking
                return;
          } 
          tickSkip = !tickSkip;
        }
    } finally {
        zk.unregisterJMX(this);
    }
}

10.看下learn和learnhandler的相互通信砸捏。learnHandler

public void run() {
    try {
        tickOfNextAckDeadline = leader.self.tick
                + leader.self.initLimit + leader.self.syncLimit;

        ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
                .getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        oa = BinaryOutputArchive.getArchive(bufferedOutput);

        QuorumPacket qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
        if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
            LOG.error("First packet " + qp.toString()
                    + " is not FOLLOWERINFO or OBSERVERINFO!");
            return;
        }
        byte learnerInfoData[] = qp.getData();
        if (learnerInfoData != null) {
            if (learnerInfoData.length == 8) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                this.sid = bbsid.getLong();
            } else {
                LearnerInfo li = new LearnerInfo();
                ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
                this.sid = li.getServerid();
                this.version = li.getProtocolVersion();
            }
        } else {
            this.sid = leader.followerCounter.getAndDecrement();//增加follower數(shù)量
        }

        LOG.info("Follower sid: " + sid + " : info : "
                + leader.self.quorumPeers.get(sid));
                    
        if (qp.getType() == Leader.OBSERVERINFO) {
              learnerType = LearnerType.OBSERVER;
        }            
        
        long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());//從leader初獲取zxid
        
        long peerLastZxid;
        StateSummary ss = null;
        long zxid = qp.getZxid();
        //判斷l(xiāng)astAcceptedEpoch和本地lastAcceptedEpoch比較,同時判斷時候一半的follower已經(jīng)提交隙赁,然后會進入等待垦藏。用的是排它鎖connectingFollowers對象
        long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);


        //發(fā)送Leader.LEADERINFO時間,將新epoch發(fā)送給follower伞访。等待follower的epochAck事件掂骏,代碼在learn中
        if (this.getVersion() < 0x10000) {
            // we are going to have to extrapolate the epoch information
            long epoch = ZxidUtils.getEpochFromZxid(zxid);
            ss = new StateSummary(epoch, zxid);
            // fake the message
            leader.waitForEpochAck(this.getSid(), ss);
        } else {
            byte ver[] = new byte[4];
            ByteBuffer.wrap(ver).putInt(0x10000);
            QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
            oa.writeRecord(newEpochPacket, "packet");
            bufferedOutput.flush();
            QuorumPacket ackEpochPacket = new QuorumPacket();
            ia.readRecord(ackEpochPacket, "packet");
            if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                LOG.error(ackEpochPacket.toString()
                        + " is not ACKEPOCH");
                return;
            }
            ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
            ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
            leader.waitForEpochAck(this.getSid(), ss);
        }
        peerLastZxid = ss.getLastZxid();




        //開始同步數(shù)據(jù)。
        
        /* the default to send to the follower */
        int packetToSend = Leader.SNAP;
        long zxidToSend = 0;
        long leaderLastZxid = 0;
        /** the packets that the follower needs to get updates from **/
        long updates = peerLastZxid;

        /* we are sending the diff check if we have proposals in memory to be able to 
         * send a diff to the 
         */ 
        //獲取leader中zkDatabase的讀寫鎖厚掷。
        ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
        ReadLock rl = lock.readLock();//獲取讀共享鎖弟灼,同步數(shù)據(jù)的時候可以讀,不能寫
        try {
            rl.lock();        
            //獲取zkdatabase的已經(jīng)提交的日志范圍
            final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();//zk提交最大的zxid
            final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();//zk提交最小的zxid
            LOG.info("Synchronizing with Follower sid: " + sid
                    +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
                    +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
                    +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));

            //獲取zkdatabase中已經(jīng)committed的proposal
            LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

            if (proposals.size() != 0) {
                LOG.debug("proposal size is {}", proposals.size());

                
                if ((maxCommittedLog >= peerLastZxid)
                        && (minCommittedLog <= peerLastZxid)) {
                    LOG.debug("Sending proposals to follower");

                    // as we look through proposals, this variable keeps track of previous
                    // proposal Id.
                    long prevProposalZxid = minCommittedLog;

                    // Keep track of whether we are about to send the first packet.
                    // Before sending the first packet, we have to tell the learner
                    // whether to expect a trunc or a diff
                    boolean firstPacket=true;

                    // If we are here, we can use committedLog to sync with
                    // follower. Then we only need to decide whether to
                    // send trunc or not
                    packetToSend = Leader.DIFF;
                    zxidToSend = maxCommittedLog;


                    /**
                     * a) 如果lastzxid在min和max之間
                     *  循環(huán) proposals 蝗肪,
                     *  (1)當(dāng)單個 proposal的zxid <= 當(dāng)前的peerLastZxid時袜爪,說明已經(jīng)提交過了,因此直接跳過
                     *  (2)當(dāng) proposal的zxid 大于 peerLastZxid時薛闪,則刪除小于peerLastZxid部分辛馆,因為已經(jīng)提交過了,剩余部分繼續(xù)做COMMIT操作豁延。因此在所有commit之前昙篙,先發(fā)送一個trunc事件,刪除已經(jīng)提交過的部分诱咏,然后發(fā)送需要的commit的相關(guān)節(jié)點
                     *  b) 如果當(dāng)前的peerLastZxid 大于 max苔可,則全部做TRUNC。
                     *  c) 剩下的不處理 ,可能是新加入的節(jié)點袋狞,所以事件類型為SNAP,同步數(shù)據(jù)時直接取快照焚辅。
                     *
                    **/
                    for (Proposal propose: proposals) {
                        // skip the proposals the peer already has
                        if (propose.packet.getZxid() <= peerLastZxid) {
                            prevProposalZxid = propose.packet.getZxid();
                            continue;
                        } else {
                            // If we are sending the first packet, figure out whether to trunc
                            // in case the follower has some proposals that the leader doesn't
                            if (firstPacket) {
                                firstPacket = false;
                                // Does the peer have some proposals that the leader hasn't seen yet
                                if (prevProposalZxid < peerLastZxid) {
                                    // send a trunc message before sending the diff
                                    packetToSend = Leader.TRUNC;                                        
                                    zxidToSend = prevProposalZxid;
                                    updates = zxidToSend;
                                }
                            }
                            queuePacket(propose.packet);
                            QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                    null, null);
                            queuePacket(qcommit);
                        }
                    }
                } else if (peerLastZxid > maxCommittedLog) {
                    LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                            Long.toHexString(maxCommittedLog),
                            Long.toHexString(updates));

                    packetToSend = Leader.TRUNC;
                    zxidToSend = maxCommittedLog;
                    updates = zxidToSend;
                } else {
                    LOG.warn("Unhandled proposal scenario");
                }
            } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                // The leader may recently take a snapshot, so the committedLog
                // is empty. We don't need to send snapshot if the follow
                // is already sync with in-memory db.
                LOG.debug("committedLog is empty but leader and follower "
                        + "are in sync, zxid=0x{}",
                        Long.toHexString(peerLastZxid));
                packetToSend = Leader.DIFF;
                zxidToSend = peerLastZxid;
            } else {
                // just let the state transfer happen
                LOG.debug("proposals is empty");
            }               

            LOG.info("Sending " + Leader.getPacketType(packetToSend));
            //將leader中沒有commit的數(shù)據(jù)commit掉
            leaderLastZxid = leader.startForwarding(this, updates);

        } finally {
            rl.unlock();
        }
        //發(fā)送NEWLEADER事件映屋,參數(shù)是newepoch
         QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                ZxidUtils.makeZxid(newEpoch, 0), null, null);
         if (getVersion() < 0x10000) {
            oa.writeRecord(newLeaderQP, "packet");
        } else {
            queuedPackets.add(newLeaderQP);
        }
        bufferedOutput.flush();
        //Need to set the zxidToSend to the latest zxid
        //如果是獲取快照,則發(fā)送當(dāng)前最新的zxid過去
        if (packetToSend == Leader.SNAP) {
            zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
        }
        oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
        bufferedOutput.flush();
        
        /* if we are not truncating or sending a diff just send a snapshot */
        //發(fā)送快照給follower
        if (packetToSend == Leader.SNAP) {
            LOG.info("Sending snapshot last zxid of peer is 0x"
                    + Long.toHexString(peerLastZxid) + " " 
                    + " zxid of leader is 0x"
                    + Long.toHexString(leaderLastZxid)
                    + "sent zxid of db as 0x" 
                    + Long.toHexString(zxidToSend));
            // Dump data to peer
            leader.zk.getZKDatabase().serializeSnapshot(oa);
            oa.writeString("BenWasHere", "signature");
        }
        bufferedOutput.flush();
        
        // Start sending packets
        //開始發(fā)送隊列中的packet
        new Thread() {
            public void run() {
                Thread.currentThread().setName(
                        "Sender-" + sock.getRemoteSocketAddress());
                try {
                    sendPackets();
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption",e);
                }
            }
        }.start();
        
        /*
         * Have to wait for the first ACK, wait until 
         * the leader is ready, and only then we can
         * start processing messages.
         */
        //等待follower的ACK反饋信息,同時等待同蜻,當(dāng)所有超過半數(shù)的follower返回LEADER.ACK成功
        qp = new QuorumPacket();
        ia.readRecord(qp, "packet");
        if(qp.getType() != Leader.ACK){
            LOG.error("Next packet was supposed to be an ACK");
            return;
        }
        LOG.info("Received NEWLEADER-ACK message from " + getSid());
        leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

        syncLimitCheck.start();
        
        // now that the ack has been processed expect the syncLimit
        sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

        /*
         * Wait until leader starts up
         */
        synchronized(leader.zk){
            while(!leader.zk.isRunning() && !this.isInterrupted()){
                leader.zk.wait(20);
            }
        }
        // Mutation packets will be queued during the serialize,
        // so we need to mark when the peer can actually start
        // using the data
        //發(fā)送UPDATE事件給follower通知可以開始使用數(shù)據(jù)棚点。leader開始監(jiān)聽發(fā)送過來的事件。更新數(shù)據(jù)完畢湾蔓。選舉完成瘫析。
        queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

        while (true) {
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
            if (qp.getType() == Leader.PING) {
                traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
            }
            if (LOG.isTraceEnabled()) {
                ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
            }
            tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;


            ByteBuffer bb;
            long sessionId;
            int cxid;
            int type;

            switch (qp.getType()) {
            case Leader.ACK:
                if (this.learnerType == LearnerType.OBSERVER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Received ACK from Observer  " + this.sid);
                    }
                }
                syncLimitCheck.updateAck(qp.getZxid());
                leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                break;
            case Leader.PING:
                // Process the touches
                ByteArrayInputStream bis = new ByteArrayInputStream(qp
                        .getData());
                DataInputStream dis = new DataInputStream(bis);
                while (dis.available() > 0) {
                    long sess = dis.readLong();
                    int to = dis.readInt();
                    leader.zk.touch(sess, to);
                }
                break;
            case Leader.REVALIDATE:
                bis = new ByteArrayInputStream(qp.getData());
                dis = new DataInputStream(bis);
                long id = dis.readLong();
                int to = dis.readInt();
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                dos.writeLong(id);
                boolean valid = leader.zk.touch(id, to);
                if (valid) {
                    try {
                        //set the session owner
                        // as the follower that
                        // owns the session
                        leader.zk.setOwner(id, this);
                    } catch (SessionExpiredException e) {
                        LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
                    }
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                                             ZooTrace.SESSION_TRACE_MASK,
                                             "Session 0x" + Long.toHexString(id)
                                             + " is valid: "+ valid);
                }
                dos.writeBoolean(valid);
                qp.setData(bos.toByteArray());
                queuedPackets.add(qp);
                break;
            case Leader.REQUEST:                    
                bb = ByteBuffer.wrap(qp.getData());
                sessionId = bb.getLong();
                cxid = bb.getInt();
                type = bb.getInt();
                bb = bb.slice();
                Request si;
                if(type == OpCode.sync){
                    si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                } else {
                    si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                }
                si.setOwner(this);
                leader.zk.submitRequest(si);
                break;
            default:
            }
        }
    } catch (IOException e) {
        if (sock != null && !sock.isClosed()) {
            LOG.error("Unexpected exception causing shutdown while sock "
                    + "still open", e);
            //close the socket to make sure the 
            //other side can see it being close
            try {
                sock.close();
            } catch(IOException ie) {
                // do nothing
            }
        }
    } catch (InterruptedException e) {
        LOG.error("Unexpected exception causing shutdown", e);
    } finally {
        LOG.warn("******* GOODBYE " 
                + (sock != null ? sock.getRemoteSocketAddress() : "<null>")
                + " ********");
        shutdown();
    }
}

隨便寫了點,給自己留點記錄默责,第一次寫沒經(jīng)驗贬循,有什么不對的地方也請大家指出

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市桃序,隨后出現(xiàn)的幾起案子杖虾,更是在濱河造成了極大的恐慌,老刑警劉巖葡缰,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件亏掀,死亡現(xiàn)場離奇詭異,居然都是意外死亡泛释,警方通過查閱死者的電腦和手機滤愕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來怜校,“玉大人间影,你說我怎么就攤上這事∏炎拢” “怎么了魂贬?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長裙顽。 經(jīng)常有香客問我付燥,道長,這世上最難降的妖魔是什么愈犹? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任键科,我火速辦了婚禮,結(jié)果婚禮上漩怎,老公的妹妹穿的比我還像新娘勋颖。我一直安慰自己,他們只是感情好勋锤,可當(dāng)我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布饭玲。 她就那樣靜靜地躺著,像睡著了一般叁执。 火紅的嫁衣襯著肌膚如雪茄厘。 梳的紋絲不亂的頭發(fā)上矮冬,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天,我揣著相機與錄音蚕断,去河邊找鬼欢伏。 笑死入挣,一個胖子當(dāng)著我的面吹牛亿乳,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播径筏,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼葛假,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了滋恬?” 一聲冷哼從身側(cè)響起聊训,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎恢氯,沒想到半個月后带斑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡勋拟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年勋磕,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片敢靡。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡挂滓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出啸胧,到底是詐尸還是另有隱情赶站,我是刑警寧澤,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布纺念,位于F島的核電站贝椿,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏陷谱。R本人自食惡果不足惜烙博,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望叭首。 院中可真熱鬧习勤,春花似錦、人聲如沸焙格。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽眷唉。三九已至予颤,卻和暖如春囤官,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蛤虐。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工党饮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人驳庭。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓刑顺,卻偏偏與公主長得像,于是被迫代替她去往敵國和親饲常。 傳聞我的和親對象是個殘疾皇子蹲堂,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,685評論 2 360

推薦閱讀更多精彩內(nèi)容