zookeeper源碼分析(4)-選舉流程和服務(wù)器啟動處理

zookeeper源碼分析(1)-服務(wù)端啟動流程分析了服務(wù)端集群啟動時會進(jìn)行選舉,下面主要分析下選舉流程和后續(xù)的leader,follower撞鹉,observer服務(wù)器的啟動流程

Leader選舉

首先介紹一些選舉相關(guān)術(shù)語:

  • SID:服務(wù)器ID,同myid的值一樣
  • ZXID:事務(wù)ID,用來標(biāo)識當(dāng)前服務(wù)器的事務(wù)變更狀態(tài),值越大說明當(dāng)前服務(wù)器的數(shù)據(jù)越新
  • Vote:投票的對象哩簿,包含如下屬性:
final private long id;  //被推舉的Leader SID值
final private long zxid;  //被推舉的Leader 事務(wù) ID值
final private long electionEpoch;//邏輯時鐘,用來判斷多個投票是否在同一輪選舉周期中酝静,每進(jìn)行新一輪的投票后节榜,都會對該值加1
final private long peerEpoch;//被推舉的Leader的epoch
final private ServerState state;//投票所屬服務(wù)器的狀態(tài)
  • 服務(wù)器狀態(tài)ServerState
 public enum ServerState {
        LOOKING, //尋找Leader狀態(tài),處于該狀態(tài)時别智,服務(wù)器會進(jìn)入選舉流程
        FOLLOWING,//跟隨者狀態(tài)全跨,只處理非事務(wù)請求,事務(wù)請求會轉(zhuǎn)交給leader服務(wù)器
        LEADING,//領(lǐng)導(dǎo)者狀態(tài)
       OBSERVING;//觀察者狀態(tài)亿遂,不參與選舉過程浓若,只處理非事務(wù)請求,事務(wù)請求會轉(zhuǎn)交給leader服務(wù)器
    }
  • QuorumCnxManager
    每臺服務(wù)器在進(jìn)行FastLeaderElection對象創(chuàng)建時蛇数,都會啟動一個QuorumCnxManager,負(fù)責(zé)各臺服務(wù)器之間的底層Leader選舉過程中的網(wǎng)絡(luò)通信挪钓,這個類中維護(hù)了一系列的隊列,用于保存接收到的/待發(fā)送的消息耳舅,對于發(fā)送隊列碌上,會對每臺其他服務(wù)器分別創(chuàng)建一個發(fā)送隊列倚评,互不干擾。核心變量為:
//消息接收隊列馏予,用于存放從其他服務(wù)器接收到的消息
public final ArrayBlockingQueue<Message> recvQueue;
//消息發(fā)送隊列天梧,按照SID分組,用于保存待發(fā)送的消息霞丧,從而保證了各臺機(jī)器之間的消息發(fā)送互不影響
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
//SendWorker是消息發(fā)送器呢岗,這是按照SID分組的消息發(fā)送器集合
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap; 
//最近發(fā)送過的消息,為每個SID保留最近發(fā)送過的消息

final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

QuorumCnxManager會為每個遠(yuǎn)程服務(wù)器創(chuàng)建一個SendWorker線程和RecvWorker線程

  • 消息發(fā)送過程:
    每個SendWorker不斷的從對應(yīng)的消息發(fā)送隊列中獲取一個消息來發(fā)送蛹尝,并將這個消息放入lastMessageSent中后豫,如果隊列為空,則從lastMessageSent取出最后一個消息重新發(fā)送突那,可解決接方?jīng)]有正確接收或處理消息的問題
  • 消息接收過程:
    每個RecvWorker不斷的從這個TCP連接中讀取消息挫酿,并將其保存到recvQueue隊列中
    下面看一下服務(wù)器之間連接的創(chuàng)建過程:
private boolean startConnection(Socket sock, Long sid)
            throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        try {
            // Use BufferedOutputStream to reduce the number of IP packets. This is
            // important for x-DC scenarios.
            BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
            dout = new DataOutputStream(buf);

            // Sending id and challenge
            // represents protocol version (in other words - message type)
            dout.writeLong(PROTOCOL_VERSION);
            dout.writeLong(self.getId());
            String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
            byte[] addr_bytes = addr.getBytes();
            dout.writeInt(addr_bytes.length);
            dout.write(addr_bytes);
            dout.flush();

            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            closeSocket(sock);
            return false;
        }

        // authenticate learner
        QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
        if (qps != null) {
            // TODO - investigate why reconfig makes qps null.
            authLearner.authenticate(sock, qps.hostname);
        }

        // If lost the challenge, then drop the new connection
        if (sid > self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                     "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            
            sw.start();
            rw.start();
            
            return true;    
            
        }
        return false;
    }

可以發(fā)現(xiàn)在兩兩創(chuàng)建連接時,有個規(guī)則:只允許SID大的服務(wù)器主動和其他服務(wù)器建立連接愕难,否則斷開連接早龟。在receiveConnection方法中,服務(wù)器會接受遠(yuǎn)程SID比自己大的連接猫缭。從而避免了兩臺服務(wù)器之間的重復(fù)連接拄衰。

leader選舉算法實(shí)現(xiàn)流程如下:


選舉主要函數(shù)為:FastLeaderElection.lookForLeader

public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
//用于記錄當(dāng)前服務(wù)器在本輪次的選舉中收到的所有外部投票
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                logicalclock.incrementAndGet();
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                } 
                else if (validVoter(n.sid) && validVoter(n.leader)) {
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock.get()){
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            if(termPredicate(recvset, new Vote(n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         * Only peer epoch is used to check that the votes come
                         * from the same ensemble. This is because there is at
                         * least one corner case in which the ensemble can be
                         * created with inconsistent zxid and election epoch
                         * info. However, given that only one ensemble can be
                         * running at a single point in time and that each 
                         * epoch is used only once, using only the epoch to 
                         * compare the votes is sufficient.
                         * 
                         * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
                         */
                        outofelection.put(n.sid, new Vote(n.leader, 
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                        if (termPredicate(outofelection, new Vote(n.leader,
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                                && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                            synchronized(this){
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: " + n.state
                              + " (n.state), " + n.sid + " (n.sid)");
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }

選舉流程為:
1.自增選舉輪次

//使得所有有效選票都在一個輪次中
logicalclock.incrementAndGet();

2.初始化選票
第一次選舉前盐碱,每臺服務(wù)器都會將自己推舉為leader

//updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
//leader為myid
synchronized void updateProposal(long leader, long zxid, long epoch){
        proposedLeader = leader;
        proposedZxid = zxid;
        proposedEpoch = epoch;
    }

3.發(fā)送初始化選票

private void sendNotifications() {
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(
//投票類型為notification
                    ToSend.mType.notification,
//投票leader的myid值
                    proposedLeader,
//投票leader的zxid值
                    proposedZxid,
//當(dāng)前選舉輪次
                    logicalclock.get(),
//當(dāng)前服務(wù)器狀態(tài)
                    QuorumPeer.ServerState.LOOKING,
//為myid
                    sid,
//當(dāng)前currentEpoch的值童漩,即currentEpoch文件的值
                    proposedEpoch, 
//參與選舉的服務(wù)器地址
            qv.toString().getBytes());
            sendqueue.offer(notmsg);
        }
    }

會對所有參與選舉的server端發(fā)送自己的選票
注意:在創(chuàng)建FastLeaderElection選舉算法對象時,會調(diào)用它的start方法待秃,

public void start() {
        this.messenger.start();
    }
// Starts instances of WorkerSender and WorkerReceiver
        void start(){
            this.wsThread.start();
            this.wrThread.start();
        }

啟動兩個線程居触,wsThread和wrThread妖混,實(shí)際上會包裝為WorkerSender和WorkerReceiver,WorkerSender會不斷的從FastLeaderElection.sendqueue 中獲得發(fā)送消息QuorumCnxManager的queueSendMap中轮洋,發(fā)送出去制市。WorkerReceiver會不斷的從QuorumCnxManager的recvQueue中獲得消息添加到FastLeaderElection.recvqueue中

實(shí)現(xiàn)流程圖如下:


選票管理

如果當(dāng)前處于選舉狀態(tài) ServerState.LOOKING,會不斷的進(jìn)入選舉循環(huán)中
4.接收外部選票Notification n
如果沒有接收到外部投票弊予,且QuorumCnxManager.queueSendMap為空祥楣,則重新發(fā)送自己的投票,否則檢查連接汉柒,沒有連接的話重新和其他服務(wù)器創(chuàng)建連接误褪,如果已經(jīng)建立則重新發(fā)送投票
5.判斷選舉輪次(如果接收到了外部選票)

  • 如果外部投票的輪次大于內(nèi)部投票n.electionEpoch > logicalclock.get(),則立即更新自己的選舉輪次logicalclock.set(n.electionEpoch); 并清空所有已經(jīng)收到的投票recvset.clear(),然后使用初始化的投票來進(jìn)行pk,并把內(nèi)部投票發(fā)送出去
  • 外部投票的輪次小于內(nèi)部投票碾褂,服務(wù)器會直接忽略掉該外部投票兽间,返回步驟4
  • 外部投票的選舉輪次和內(nèi)部投票一致,開始pk選票
    6.選票pk FastLeaderElection#totalOrderPredicate
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

因素考慮優(yōu)先級:
1.選舉輪次 2.ZXID 3.SID,誰越大選誰
7.變更投票正塌,并將變更發(fā)送出去

updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();

8.選票歸檔
recvset用于記錄當(dāng)前服務(wù)器在本輪次的leader選舉中收到的所有外部投票嘀略,按照SID來區(qū)分
9.統(tǒng)計投票恤溶,更新服務(wù)器狀態(tài)
如果termPredicate返回為true,說明recvset接收到當(dāng)前輪次所有其他服務(wù)器的投票,如果不再接收到其他選票帜羊,說明當(dāng)前服務(wù)器的選票就是最終leader的SID,也就是有過半的服務(wù)器認(rèn)可了當(dāng)前的內(nèi)部投票咒程,如果確定已經(jīng)有過半的服務(wù)器認(rèn)可了該內(nèi)部投票,則更新當(dāng)前服務(wù)器的狀態(tài)讼育,確定是自身是leader還是follower帐姻,否則終止投票,否則返回步驟4

至此窥淆,選舉過程已經(jīng)分析完畢了,確定了服務(wù)器的角色之后巍杈,下面來看各個服務(wù)器的啟動流程
先放張Leader服務(wù)器和Follewer服務(wù)器啟動期交互過程圖


Leader服務(wù)器啟動

主要方法:Leader.lead()

void lead() throws IOException, InterruptedException {
       ········統(tǒng)計選舉時間和注冊JMX代碼省略········
        try {
            self.tick.set(0);
            zk.loadData();

            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // Start thread that waits for connection requests from new followers.
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();

            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

            synchronized(this){
                lastProposed = zk.getZxid();
            }

            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                   null, null);

            QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
            QuorumVerifier curQV = self.getQuorumVerifier();
            if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
               
               try {
                   QuorumVerifier newQV = self.configFromString(curQV.toString());
                   newQV.setVersion(zk.getZxid());
                   self.setLastSeenQuorumVerifier(newQV, true);    
               } catch (Exception e) {
                   throw new IOException(e);
               }
            }
            
            newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
            if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
               newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
            
            // We have to get at least a majority of servers in sync with
            // us. We do this by waiting for the NEWLEADER packet to get
            // acknowledged
                       
             waitForEpochAck(self.getId(), leaderStateSummary);
             self.setCurrentEpoch(epoch);    
            
             try {
                 waitForNewLeaderAck(self.getId(), zk.getZxid());
             } catch (InterruptedException e) {
                 shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                         + newLeaderProposal.ackSetsToString() + " ]");
                 HashSet<Long> followerSet = new HashSet<Long>();

                 for(LearnerHandler f : getLearners()) {
                     if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
                         followerSet.add(f.getSid());
                     }
                 }    
                 boolean initTicksShouldBeIncreased = true;
                 for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
                     if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
                         initTicksShouldBeIncreased = false;
                         break;
                     }
                 }                  
                 if (initTicksShouldBeIncreased) {
                     LOG.warn("Enough followers present. "+
                             "Perhaps the initTicks need to be increased.");
                 }
                 return;
             }

             startZkServer();
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }

            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.setZooKeeperServer(zk);
            }

            self.adminServer.setZooKeeperServer(zk);
            boolean tickSkip = true;
            // If not null then shutdown this leader
            String shutdownMessage = null;

            while (true) {
                synchronized (this) {
                    long start = Time.currentElapsedTime();
                    long cur = start;
                    long end = start + self.tickTime / 2;
                    while (cur < end) {
                        wait(end - cur);
                        cur = Time.currentElapsedTime();
                    }

                    if (!tickSkip) {
                        self.tick.incrementAndGet();
                    }

                    // We use an instance of SyncedLearnerTracker to
                    // track synced learners to make sure we still have a
                    // quorum of current (and potentially next pending) view.
                    SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
                    syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
                    if (self.getLastSeenQuorumVerifier() != null
                            && self.getLastSeenQuorumVerifier().getVersion() > self
                                    .getQuorumVerifier().getVersion()) {
                        syncedAckSet.addQuorumVerifier(self
                                .getLastSeenQuorumVerifier());
                    }

                    syncedAckSet.addAck(self.getId());

                    for (LearnerHandler f : getLearners()) {
                        if (f.synced()) {
                            syncedAckSet.addAck(f.getSid());
                        }
                    }

                    // check leader running status
                    if (!this.isRunning()) {
                        // set shutdown flag
                        shutdownMessage = "Unexpected internal error";
                        break;
                    }

                    if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
                        // Lost quorum of last committed and/or last proposed
                        // config, set shutdown flag
                        shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
                                + syncedAckSet.ackSetsToString() + " ]";
                        break;
                    }
                    tickSkip = !tickSkip;
                }
                for (LearnerHandler f : getLearners()) {
                    f.ping();
                }
            }
            if (shutdownMessage != null) {
                shutdown(shutdownMessage);
                // leader goes in looking state
            }
        } finally {
            zk.unregisterJMX(this);
        }
    }

1.重新加載快照和事務(wù)日志數(shù)據(jù)忧饭,可參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲

  1. 啟動Follewer接收器LearnerCnxAcceptor
    LearnerCnxAcceptor負(fù)責(zé)接收所有非Leader服務(wù)器的連接請求,用于集群間非選舉通信
    LearnerCnxAcceptor.run()
 public void run() {
                while (!stop) {
                        Socket s = ss.accept();
                        // start with the initLimit, once the ack is processed
                        // in LearnerHandler switch to the syncLimit
                        s.setSoTimeout(self.tickTime * self.initLimit);
                        s.setTcpNoDelay(nodelay);
                        BufferedInputStream is = new BufferedInputStream(
                                s.getInputStream());
                        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
                        fh.start();
                    }
·······省略異常處理代碼·······
}

可以看到當(dāng)接收到其余服務(wù)器的連接請求時筷畦,會創(chuàng)建LearnerHandler實(shí)例词裤,該實(shí)例負(fù)責(zé)Leader服務(wù)器和其他服務(wù)器之間的消息通信和數(shù)據(jù)同步,初次創(chuàng)建會收到其他服務(wù)器發(fā)送的OBSERVERINFO或是FOLLOWERINFO類型的消息,通信類型可參考zookeeper集群間通信類型
5.Leader解析Learner消息鳖宾,計算新的epoch(getEpochToPropose)
邏輯為:如果Learner的epoch比Leader的epoch大吼砂,則epoch_of_leader = epoch_of_learner + 1,然后該LearnerHandler會進(jìn)行等待,知道過半的Learner已經(jīng)和Leader建立過通信鼎文,這樣就可以確定Leader 的epoch了
6.Leader向其他服務(wù)器發(fā)送leader狀態(tài)
LearnerHandler.run

public void run() {
        try {
            leader.addLearnerHandler(this);
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;

            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
                LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }

            byte learnerInfoData[] = qp.getData();
            if (learnerInfoData != null) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                    this.version = bbsid.getInt(); // protocolVersion
                }
                if (learnerInfoData.length >= 20) {
                    long configVersion = bbsid.getLong();
                    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                    }
                }
            } else {
                this.sid = leader.followerCounter.getAndDecrement();
            }

            if (leader.self.getView().containsKey(this.sid)) {
                LOG.info("Follower sid: " + this.sid + " : info : "
                        + leader.self.getView().get(this.sid).toString());
            } else {
                LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
            }
                        
            if (qp.getType() == Leader.OBSERVERINFO) {
                  learnerType = LearnerType.OBSERVER;
            }

            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

            long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                leader.waitForEpochAck(this.getSid(), ss);
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                oa.writeRecord(newEpochPacket, "packet");
                bufferedOutput.flush();
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
                }
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                leader.waitForEpochAck(this.getSid(), ss);
            }
            peerLastZxid = ss.getLastZxid();
           
            // Take any necessary action if we need to send TRUNC or DIFF
            // startForwarding() will be called in all cases
            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
            
            LOG.debug("Sending NEWLEADER message to " + sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
            // we got here, so the version was set
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();

            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();
                    // Dump data to peer
                    leader.zk.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
                    bufferedOutput.flush();
                } finally {
                    snapshot.close();
                }
            }

            // Start thread that blast packets in the queue to learner
            startSendingPackets();
            
            /*
             * Have to wait for the first ACK, wait until
             * the leader is ready, and only then we can
             * start processing messages.
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.ACK){
                LOG.error("Next packet was supposed to be an ACK,"
                    + " but received packet: {}", packetToString(qp));
                return;
            }

            if(LOG.isDebugEnabled()){
                LOG.debug("Received NEWLEADER-ACK message from " + sid);   
            }
            leader.waitForNewLeaderAck(getSid(), qp.getZxid());

            syncLimitCheck.start();
            
            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

            /*
             * Wait until leader starts up
             */
            synchronized(leader.zk){
                while(!leader.zk.isRunning() && !this.isInterrupted()){
                    leader.zk.wait(20);
                }
            }
            // Mutation packets will be queued during the serialize,
            // so we need to mark when the peer can actually start
            // using the data
            //
            LOG.debug("Sending UPTODATE message to " + sid);      
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

while(true){
········之后表示主從已經(jīng)同步完成渔肩,zkServer啟動完畢,可以接收服務(wù)器間的通信了················
}

啟動LearnerHandler之后拇惋,會向Learner發(fā)送LEADERINFO,此時leader線程和LearnerHandler線程都會等待在leader.waitForEpochAck(this.getSid(), ss);方法上
7.Leaner響應(yīng)ACKEPOCH消息
當(dāng)一半Leaner參與選舉的服務(wù)器回復(fù)ACKEPOCH消息之后周偎,Leader服務(wù)器發(fā)送開始進(jìn)行主從數(shù)據(jù)同步,boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);可參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲
8.zkServer啟動
開始進(jìn)行主從同步后,Leader線程會等待在waitForNewLeaderAck(self.getId(), zk.getZxid());方法上撑帖,每當(dāng)一個LearnerHandler線程完成了和Learner服務(wù)器的同步蓉坎,會發(fā)送一個NEWLWADER給Learner服務(wù)器,Learner服務(wù)器會響應(yīng)一個ACK消息給LearnerHandler胡嘿,一半Leaner參與選舉的服務(wù)器回復(fù)ACK消息之后,leader服務(wù)器會啟動LeaderZooKeeperServer,同時LearnerHandler會發(fā)送一個UPTODATE消息給同步好的Leaner服務(wù)器蛉艾,表示同步完成,可對外提供服務(wù)了

注意:leader服務(wù)器維護(hù)了兩個服務(wù)器校驗器

 //last committed quorum verifier
    public QuorumVerifier quorumVerifier;
   
    //last proposed quorum verifier
    public QuorumVerifier lastSeenQuorumVerifier = null;

在和Leaner服務(wù)器進(jìn)行同步前交互時衷敌,傳遞的一直是lastSeenQuorumVerifier勿侯,我的理解是這樣不影響事務(wù)請求正常提交的quorumVerifier.version,代碼解釋為:如有不對,請小伙伴指教~

Follewer服務(wù)器啟動

主要流程為Follower.followLeader()

void followLeader() throws InterruptedException {
·········省略JMX注冊和異常檢查代碼·········
                 QuorumServer leaderServer = findLeader();
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } 
    }

1.主動連接注冊到Leader服務(wù)器缴罗,并發(fā)送FOLLOWERINFO消息
2.一旦tcp連接上了罐监,會接收到leader服務(wù)器發(fā)送的LEADERINFO消息,并回復(fù)ACKEPOCH消息,調(diào)用Learner.registerWithLeader(Leader.FOLLOWERINFO);

 /**
     * Once connected to the leader, perform the handshake protocol to
     * establish a following / observing connection. 
     * @param pktType
     * @return the zxid the Leader sends for synchronization purposes.
     * @throws IOException
     */
    protected long registerWithLeader(int pktType) throws IOException{
        /*
         * Send follower info, including last zxid and sid
         */
        long lastLoggedZxid = self.getLastLoggedZxid();
        QuorumPacket qp = new QuorumPacket();                
        qp.setType(pktType);
        qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
        
        /*
         * Add sid to payload
         */
        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
        boa.writeRecord(li, "LearnerInfo");
        qp.setData(bsid.toByteArray());
        
        writePacket(qp, true);
        readPacket(qp);        
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        if (qp.getType() == Leader.LEADERINFO) {
            // we are connected to a 1.0 server so accept the new epoch and read the next packet
            leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
            byte epochBytes[] = new byte[4];
            final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
            if (newEpoch > self.getAcceptedEpoch()) {
                wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
                self.setAcceptedEpoch(newEpoch);
            } else if (newEpoch == self.getAcceptedEpoch()) {
                // since we have already acked an epoch equal to the leaders, we cannot ack
                // again, but we still need to send our lastZxid to the leader so that we can
                // sync with it if it does assume leadership of the epoch.
                // the -1 indicates that this reply should not count as an ack for the new epoch
                wrappedEpochBytes.putInt(-1);
            } else {
                throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
            }
            QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
            writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
        } else {
            if (newEpoch > self.getAcceptedEpoch()) {
                self.setAcceptedEpoch(newEpoch);
            }
            if (qp.getType() != Leader.NEWLEADER) {
                LOG.error("First packet should have been NEWLEADER");
                throw new IOException("First packet should have been NEWLEADER");
            }
            return qp.getZxid();
        }
    } 

3.開始數(shù)據(jù)同步syncWithLeader(newEpochZxid);,參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲

4.數(shù)據(jù)同步完成瞒爬,啟動LearnerZooKeeperServer,初始化請求鏈

Observer服務(wù)器啟動

主要流程為:Observer.observeLeader()

void observeLeader() throws Exception {
·········省略JMX注冊和異常檢查代碼·········
        try {
            QuorumServer leaderServer = findLeader();
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
 
                syncWithLeader(newLeaderZxid);
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
    }

1.主動連接注冊到Leader服務(wù)器弓柱,并發(fā)送OBSERVERINFO消息
2.一旦tcp連接上了沟堡,會接收到leader服務(wù)器發(fā)送的LEADERINFO消息,并回復(fù)ACKEPOCH消息,主要用來告訴服務(wù)器自己當(dāng)前的lastLoggedZxid和epochBytes矢空,調(diào)用Learner.registerWithLeader(Leader.OBSERVERINFO);
3.開始數(shù)據(jù)同步syncWithLeader(newEpochZxid);,參考zookeeper源碼分析(6)-數(shù)據(jù)和存儲

4.數(shù)據(jù)同步完成航罗,啟動LearnerZooKeeperServer,初始化請求鏈

此后當(dāng)Leader節(jié)點(diǎn)斷掉或Leader服務(wù)器失去了與過半Follower的聯(lián)系時,底層節(jié)點(diǎn)之間的通信會拋出異常屁药,此時Leader.lead() or Follower.followLeader()會結(jié)束方法內(nèi)的循環(huán)粥血,從而返回至Quorum.run方法內(nèi),節(jié)點(diǎn)分別關(guān)閉各自的所有通信酿箭,將選舉狀態(tài)置為LOOKING狀態(tài)复亏,開始新一輪的選舉。

感謝您的閱讀缭嫡,我是Monica23334 || Monica2333 缔御。立下每周寫一篇原創(chuàng)文章flag的小姐姐,關(guān)注我并期待打臉吧~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末妇蛀,一起剝皮案震驚了整個濱河市耕突,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌评架,老刑警劉巖眷茁,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異纵诞,居然都是意外死亡上祈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門浙芙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雇逞,“玉大人,你說我怎么就攤上這事茁裙√猎遥” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵晤锥,是天一觀的道長掉蔬。 經(jīng)常有香客問我,道長矾瘾,這世上最難降的妖魔是什么女轿? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮壕翩,結(jié)果婚禮上蛉迹,老公的妹妹穿的比我還像新娘。我一直安慰自己放妈,他們只是感情好北救,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布荐操。 她就那樣靜靜地躺著,像睡著了一般珍策。 火紅的嫁衣襯著肌膚如雪托启。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天攘宙,我揣著相機(jī)與錄音屯耸,去河邊找鬼。 笑死蹭劈,一個胖子當(dāng)著我的面吹牛疗绣,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播铺韧,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼多矮,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了祟蚀?” 一聲冷哼從身側(cè)響起工窍,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤割卖,失蹤者是張志新(化名)和其女友劉穎前酿,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鹏溯,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡罢维,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了丙挽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肺孵。...
    茶點(diǎn)故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖颜阐,靈堂內(nèi)的尸體忽然破棺而出平窘,到底是詐尸還是另有隱情,我是刑警寧澤凳怨,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布瑰艘,位于F島的核電站,受9級特大地震影響肤舞,放射性物質(zhì)發(fā)生泄漏紫新。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一李剖、第九天 我趴在偏房一處隱蔽的房頂上張望芒率。 院中可真熱鬧,春花似錦篙顺、人聲如沸偶芍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽腋寨。三九已至聪铺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間萄窜,已是汗流浹背铃剔。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留查刻,地道東北人键兜。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像穗泵,于是被迫代替她去往敵國和親普气。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容