zookeeper ZAB Leader Elect 源碼分析

前言

對(duì)于一個(gè)包含多個(gè)節(jié)點(diǎn)的zookeeper集群裁眯,需要選出一個(gè)節(jié)點(diǎn)作為Leader節(jié)點(diǎn)來提供后續(xù)的服務(wù)遗嗽。那么zookeeper選主的協(xié)議是怎么樣的呢忍燥,我們下面一探究竟

選主協(xié)議

zookeeper會(huì)把集群中的節(jié)點(diǎn)分成2種類型:

  • participant 參加選舉
  • observer 不能參加選舉

對(duì)于partipant類型的節(jié)點(diǎn)會(huì)參加主節(jié)點(diǎn)的選舉嘴拢,選舉的過程如下

  1. 每個(gè)節(jié)點(diǎn)啟動(dòng)之后生成自己的vote仗处,這個(gè)vote包含主要三個(gè)方面的信息
 id:推舉的主節(jié)點(diǎn)的id,默認(rèn)為自己
 zxid:本機(jī)器的處理的最新的事物id
 electionEpoch:每輪選舉的標(biāo)識(shí)
  1. 每個(gè)節(jié)點(diǎn)把當(dāng)前的vote發(fā)送給別的參與選主的節(jié)點(diǎn)
  2. 每個(gè)節(jié)點(diǎn)接受來自于別的服務(wù)器發(fā)送來的投票信息r_vote吨拍,根據(jù)以下規(guī)則來判斷是不是需要更新自己的vote
1. 比較vote.zxid和r_vote.zxid的大小關(guān)系,如果vote.zxid > r_vote.zxid,那么更新當(dāng)前vot.id為r_vote.id网杆,表示本節(jié)點(diǎn)推舉vote推薦的節(jié)點(diǎn)作為主節(jié)點(diǎn)羹饰,如果vote.zxid < r_vote.zxid,不更新本vote碳却,如果vote.zxid == r_vote.zxid那么執(zhí)行
下面2的邏輯
2. 比較vote.id 和 r_vote的id队秩,如果vote.id > r_vote.id不更新 ,如果vote.id < r_vote.id那么更新本vote
  1. 更新投票信息
  2. 查看是不是有節(jié)點(diǎn)得到超過半數(shù)的投票昼浦,如果有那么選舉出主節(jié)點(diǎn)
  3. 如果沒有節(jié)點(diǎn)得到超過半數(shù)的投票馍资,那么重復(fù)執(zhí)行步驟2
tips

這里提一下,每個(gè)節(jié)點(diǎn)在啟動(dòng)選舉的時(shí)候都會(huì)有一個(gè)electionEpoch屬性关噪,在同一輪選舉中各個(gè)節(jié)點(diǎn)的electionEpoch應(yīng)該是相同的鸟蟹,如果有一個(gè)節(jié)點(diǎn)的electionEpoch小于別的其他節(jié)點(diǎn),那么說明這個(gè)節(jié)點(diǎn)已經(jīng)落后于其他節(jié)點(diǎn)了使兔,這個(gè)時(shí)候需要清空它得到的投票信息建钥,重新更新electionEpoch加入新一輪的選主過程

選主涉及的各個(gè)線程

- WorkerSender

接受別的服務(wù)器發(fā)來的投票信息(這里不涉及網(wǎng)絡(luò)操作,只是把投票信息發(fā)送到待發(fā)隊(duì)列中)

- WorkerReceiver

發(fā)送本機(jī)的投票信息給別的服務(wù)器(這里不涉及網(wǎng)絡(luò)操作虐沥,只是從接受投票的隊(duì)列中接受別的服務(wù)器發(fā)送來的投票信息)

每個(gè)參與投票的節(jié)點(diǎn)到其他所有的投票節(jié)點(diǎn)都會(huì)接連網(wǎng)絡(luò)鏈接

- SendWorker

每個(gè)連接上都會(huì)有一個(gè)SendWorker用來通過網(wǎng)絡(luò)把投票信息發(fā)送給對(duì)應(yīng)的節(jié)點(diǎn)

- ReceiveWorker

每個(gè)連接上都會(huì)有一個(gè)ReceiveWorker用來通過網(wǎng)絡(luò)接受來自其他節(jié)點(diǎn)發(fā)送過來的投票信息

- ListenerHandler

每個(gè)節(jié)點(diǎn)接受其他節(jié)點(diǎn)連接請(qǐng)求的處理線程

- QuorumPeer

根據(jù)獲得到的其他節(jié)點(diǎn)的投票信息來動(dòng)態(tài)的改變vote和檢查是不是有主節(jié)點(diǎn)被選舉出來熊经,如果有主節(jié)點(diǎn)被選舉出來,那么退出選舉過程進(jìn)入數(shù)據(jù)恢復(fù)過程欲险,如果沒有主節(jié)點(diǎn)被選舉出來镐依,那么繼續(xù)選舉過程

下面是上述幾個(gè)線程工作交互的流程圖


leader_elect_thread_exchange.png

有了上述這些鋪墊,那我們開始zookeeper集群選主源碼分析吧

節(jié)點(diǎn)啟動(dòng)入口

QuorumPeerMain是每個(gè)服務(wù)節(jié)點(diǎn)的啟動(dòng)入口類

initializeAndRun

是啟動(dòng)入口方法天试,在這個(gè)方法中主要做了如下三件事

  1. 把zoo.cfg解析成QuorumPeerConfig的屬性
  2. 啟動(dòng)DatadirCleanupManager來定期的清理過期snapshop文件
  3. 啟動(dòng)節(jié)點(diǎn) runFromConfig
runFromConfig

這個(gè)方法很長槐壳,我把一些主要的點(diǎn),做一些注釋說明

 public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
     
           //上面省略一大波秋秤,但是不影響理解
            if (config.getClientPortAddress() != null) {
                //獲取服務(wù)端的IO服務(wù)工廠類宏粤,默認(rèn)是NIOServerCnxnFactory
                cnxnFactory = ServerCnxnFactory.createFactory();
               //設(shè)置ServerCnxnFactory類的一些屬性:端口脚翘,最大可以接受的客戶端連接數(shù),創(chuàng)建SelectorThread绍哎,ExpiredThread類等
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
            }

            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
            }
            //QuorumPeer是服務(wù)節(jié)點(diǎn)的代表類来农,接下來發(fā)送的事情都和他有關(guān)
            quorumPeer = getQuorumPeer();
            //設(shè)置data和log的訪問類
            quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
            quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
            quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
            //quorumPeer.setQuorumPeers(config.getAllMembers());
            //設(shè)置主節(jié)點(diǎn)選舉算法,目前只有一種:FastLeaderElection
            quorumPeer.setElectionType(config.getElectionAlg());
            //設(shè)置本節(jié)點(diǎn)的sid
            quorumPeer.setMyid(config.getServerId());
            quorumPeer.setTickTime(config.getTickTime());
            quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
            quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
            quorumPeer.setInitLimit(config.getInitLimit());
            quorumPeer.setSyncLimit(config.getSyncLimit());
            quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
            quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
            quorumPeer.setConfigFileName(config.getConfigFilename());
            quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
           //設(shè)置zookeeper的DataBase崇堰,注意這個(gè)時(shí)候沃于,還沒有做數(shù)據(jù)的恢復(fù)
            quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
            quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
            if (config.getLastSeenQuorumVerifier() != null) {
                quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
            }
            quorumPeer.initConfigInZKDatabase();
            quorumPeer.setCnxnFactory(cnxnFactory);
            quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
            quorumPeer.setSslQuorum(config.isSslQuorum());
            quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
            //設(shè)置節(jié)點(diǎn)的類型:participant或者observer
            quorumPeer.setLearnerType(config.getPeerType());
            quorumPeer.setSyncEnabled(config.getSyncEnabled());
          // 省去一大波代碼
           //初始化quorumPeer,這里主要是創(chuàng)建認(rèn)證服務(wù)的工具類
            quorumPeer.initialize();

            if (config.jvmPauseMonitorToRun) {
                quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
            }
            //啟動(dòng)quoumPeer
            quorumPeer.start();
            ZKAuditProvider.addZKStartStopAuditLog();
          
            quorumPeer.join();
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Quorum Peer interrupted", e);
        } finally {
            if (metricsProvider != null) {
                try {
                    metricsProvider.stop();
                } catch (Throwable error) {
                    LOG.warn("Error while stopping metrics", error);
                }
            }
        }
    }
QuorumPeer.start()

QuorumPeer啟動(dòng)的地方

   public synchronized void start() {
        //檢查本節(jié)點(diǎn)是不是被包含在配置文件配置的服務(wù)器列表中
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        //做節(jié)點(diǎn)數(shù)據(jù)的恢復(fù)海诲,請(qǐng)參考 http://www.reibang.com/p/f10ffc0ff861
        loadDataBase();
       //啟動(dòng)SelectorThread繁莹,AcceptThread,來準(zhǔn)備接受客戶的請(qǐng)求特幔,請(qǐng)參考http://www.reibang.com/p/8153a113fdf7
        startServerCnxnFactory();
//        try {
//            adminServer.start();
//        } catch (AdminServerException e) {
//            LOG.warn("Problem starting AdminServer", e);
//            System.out.println(e);
//        }
        //啟動(dòng)集群選主過程
        startLeaderElection();
        startJvmPauseMonitor();
        //本身QuorumPeer也是一個(gè)線程咨演,現(xiàn)在啟動(dòng)QuorumPeer
        super.start();
    }

startLeaderElection

在startLeaderElection方法中會(huì)創(chuàng)建Leader選舉過程中需要的一些線程

public synchronized void startLeaderElection() {
        try {
            if (getPeerState() == ServerState.LOOKING) {
                //設(shè)置當(dāng)前vote的信息,主要是三個(gè)信息:推舉的主節(jié)點(diǎn)id蚯斯,本節(jié)點(diǎn)最新的事物id zxid薄风,當(dāng)前選舉的輪數(shù)。
                //每個(gè)節(jié)點(diǎn)在啟動(dòng)的時(shí)候都推舉自己作為Leader拍嵌,emm遭赂。。臉皮挺厚
                currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
            }
        } catch (IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
        }
        //創(chuàng)建選舉算法
        this.electionAlg = createElectionAlgorithm(electionType);
    }
createElectionAlgorithm

直接看源碼

 protected Election createElectionAlgorithm(int electionAlgorithm) {
        Election le = null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 1:
            throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
        case 2:
            throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
         //目前zookeeper只是支持一種選舉算法
        case 3:
           //QuorumCnxManager 是QuorumPeer管理與其他節(jié)點(diǎn)socket連接的類
            QuorumCnxManager qcm = createCnxnManager();
            //通過qcmRef檢查是不是有已經(jīng)存在的老的QuorumCnxManager存在横辆,如果有那么就關(guān)閉他
            QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
            if (oldQcm != null) {
                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                oldQcm.halt();
            }
            //Listenser是ListenerHandler的管理類
            QuorumCnxManager.Listener listener = qcm.listener;
            if (listener != null) {
               //啟動(dòng)listener來啟動(dòng)各個(gè)ListenserHandler
                listener.start();
                //創(chuàng)建FastLeaderElection
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                //通過start來啟動(dòng)WorkerSender撇他,WorkerReceiver
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }
tips
  1. QuorumCnxManager中為什么需要用Listenser來管理ListenserHandler?
    因?yàn)榉?wù)節(jié)點(diǎn)可能具有多個(gè)網(wǎng)卡狈蚤,這個(gè)節(jié)點(diǎn)可能會(huì)在不同的網(wǎng)卡對(duì)應(yīng)的ip地址去啟動(dòng)監(jiān)聽端口困肩,在這種情況下一個(gè)QuorumCnxManager可能會(huì)包含多個(gè)ListenserHandler,所以使用一個(gè)Listenser去管理這些ListenserHandler炫惩。
FastLeaderElection

創(chuàng)建FastLeaderElection的時(shí)候發(fā)生了什么

  1. 會(huì)創(chuàng)建QuorumPeer收發(fā)信息的隊(duì)列sendqueue僻弹,recvqueue
private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;
        //創(chuàng)建 sendqueue和recvqueue對(duì)象
        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        //創(chuàng)建Messenger來管理WorkerSender和WorkerReceiver
        this.messenger = new Messenger(manager);
    }

2.創(chuàng)建Messenger類,在Manager類中會(huì)創(chuàng)建WorkerSender他嚷,WorkerReceiver來處理sendqueue和recvqueue中的數(shù)據(jù)

Messenger(QuorumCnxManager manager) {

            this.ws = new WorkerSender(manager);

            this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
            this.wsThread.setDaemon(true);

            this.wr = new WorkerReceiver(manager);

            this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
            this.wrThread.setDaemon(true);
        }

QuorumPeer.start

我們看下QuorumPeer的線程做了哪些邏輯處理


try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                //處理選主的邏輯
                case LOOKING:
                    LOG.info("LOOKING");
                       //省略.....
                    
                        try {
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            //QuromPeer進(jìn)入選主邏輯
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    break;
               //處理作為observer的邏輯
                case OBSERVING:
                    // 省略............
                    break;
               //處理作為follower的邏輯
                case FOLLOWING:
                    // 省略............
                    break;
                //處理作為Leader的邏輯
                case LEADING:
                      // 省略............
                    break;
                }
            }
        } finally {
           // 忽略這部分代碼
        }
    }
FastLeaderElection.lookForLeader

選主的過程在lookForLeader完成蹋绽,這個(gè)方法的代碼很長,大概有200行,我回把一些不重要的代碼刪除筋蓖,

 public Vote lookForLeader() throws InterruptedException {
          //這個(gè)地方刪除了JMX 的一些信息

        self.start_fle = Time.currentElapsedTime();
        try {
            /*
             * The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
             * if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
             * of participants has voted for it.
             */
             //上面英文注釋已經(jīng)很清楚了卸耘,主要意思就是這個(gè)recvset用來接受每個(gè)服務(wù)器發(fā)送來的投票信息,
             //key 是服務(wù)器的sid粘咖,vote就是這個(gè)服務(wù)器推舉的vote,通過recvset可以判斷出master節(jié)點(diǎn)有沒有被選舉出來
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            /*
             * The votes from previous leader elections, as well as the votes from the current leader election are
             * stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
             * Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
             * outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
             * the electionEpoch of the received notifications) in a leader election.
             */
            //是master節(jié)點(diǎn)用來存放 自己別選舉為Leader的vote信息
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                //logicalclock用來標(biāo)識(shí)每次選舉的輪次蚣抗,todo
                logicalclock.incrementAndGet();
                 //更新本節(jié)點(diǎn)推舉的Leader信息(proposedLeader,proposedZxid瓮下,proposedEpoch)
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            //把自己的Proposal發(fā)送給其他的服務(wù)器
            sendNotifications();

            SyncedLearnerTracker voteSet;

            /*
             * Loop in which we exchange notifications until we find a leader
             */
            
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                //從recvqueue中獲取別的服務(wù)器發(fā)送來的投票信息(也包括自己發(fā)送來的投票信息)
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if (n == null) {
                     //如果從recvqueue中沒有得到投票信息
                     //如果QuorumCnxManager到別的服務(wù)節(jié)點(diǎn)已經(jīng)建立了socket連接翰铡,那么直接發(fā)送Notification
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        //QuorumPeer通過QuorumCnxManager建立到別的服務(wù)節(jié)點(diǎn)網(wǎng)絡(luò)連接
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                     //更新notTimeout
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                   
                        if (n.electionEpoch > logicalclock.get()) {
                           //如果接受到的投票信息所在的投票輪次大于logicalclock钝域,那么就更新logicalclock,同時(shí)把
                           //之前接受到的投票信息清空
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            //totalOrderPredicate 作用是比較獲得的vote個(gè)本節(jié)點(diǎn)vote锭魔,比較方式就是我們?cè)谖恼麻_頭描述的那樣例证,依次比較zxid,id迷捧,
                           //通過totalOrderPredicate來決定是不是需要更新本節(jié)點(diǎn)的vote织咧,如果需要更新,更新之后漠秋,把相關(guān)的該更新信息發(fā)送給別的服務(wù)節(jié)點(diǎn)
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                                 //如果接受到的vote的選舉輪次electionEpoch小于本機(jī)的選舉輪次electionEpoch笙蒙,那么直接把接受到的這個(gè)vote丟棄
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                             //同上面對(duì)totalOrderPredicate的分析
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        // don't care about the version if it's in LOOKING state
                        //把接受到的vote信息加入到recvset中
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        //根據(jù)recvset和本節(jié)點(diǎn)的vote獲取 VoteTracker
                        //VoteTracker用來判斷本節(jié)點(diǎn)的vote是不是得到的過半數(shù)的其他節(jié)點(diǎn)的推舉
                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader,proposedZxid , logicalclock.get(), proposedEpoch));
                        if (voteSet.hasAllQuorums()) {
                            //即使如果本節(jié)點(diǎn)的vote獲得了過半數(shù)participant的推舉,那么還需要通過recvqueue最多等待finalizeWait ms來確定本機(jī)的vote會(huì)不會(huì)被新來的vote更新

                            // Verify if there is any change in the proposed leader
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                //如果等了finalizeWait這么長時(shí)間之后庆锦,沒有接收到任何的vote信息捅位,那么說明,大家都承認(rèn)本機(jī)的vote所推舉的節(jié)點(diǎn)為Leader節(jié)點(diǎn)
                              //根據(jù)proposedLeader和本機(jī)的sid來設(shè)置QuorumPeer的節(jié)點(diǎn)狀態(tài)
                               //如果proposedLeader == sid 那么設(shè)置本節(jié)點(diǎn)為Leader搂抒,反之绿渣,如果本節(jié)點(diǎn)是participant類型那么設(shè)置本節(jié)點(diǎn)狀態(tài)為Following,如果本節(jié)點(diǎn)狀態(tài)是Observer類型那么設(shè)置本節(jié)點(diǎn)狀態(tài)為Observing
                                setPeerState(proposedLeader, voteSet);
                               //生成最終代表Leader節(jié)點(diǎn)信息的vote
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                //leaveInstance 用來清空recvqueue燕耿,表示本輪選舉結(jié)束
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                      //如果接受的vote的state是observing 那么什么都不做

                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                         //這里有一個(gè)問題,就是什么情況下節(jié)點(diǎn)接受到的vote的狀態(tài)會(huì)是following或者leading姜胖,
                          //換句話說就是集群中的Leader已經(jīng)選舉出來了誉帅。
                         //比如當(dāng)一個(gè)集群中新加入了一個(gè)節(jié)點(diǎn),那么在這種情況下右莱,新節(jié)點(diǎn)就會(huì)得到別的服務(wù)節(jié)點(diǎn)的vote蚜锨,這個(gè)vote就是following或者leading的:這個(gè)地方和WorkerReceiver的工作機(jī)制有關(guān)系
                         //如果接受到的vote的狀態(tài)是Leading或者following,
                        if (n.electionEpoch == logicalclock.get()) {
                            //如果是同一輪選舉慢蜓,那么直接把vote加入recvset
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                              //通過voteSet去判斷是不是有過半數(shù)的participant推舉當(dāng)前vote.leader,同時(shí)還要求Leader服務(wù)器也把自己的vote發(fā)送給本節(jié)點(diǎn)了
                            if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                                setPeerState(n.leader, voteSet);
                                Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         *
                         * Note that the outofelection map also stores votes from the current leader election.
                         * See ZOOKEEPER-1732 for more information.
                         */
                        //如果不是同一輪選舉亚再,那么把獲得的vote信息加入outofelection,下面就是通過outofelection來找出Leader節(jié)點(diǎn)
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized (this) {
                                logicalclock.set(n.electionEpoch);
                                setPeerState(n.leader, voteSet);
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }

對(duì)上述代碼邏輯可以使用下圖去描述


leader_election_logical.png

上面就是QuorumPeer線程的選主的工作邏輯
接下來我們看下其中的一些細(xì)節(jié)晨抡,這些細(xì)節(jié)會(huì)關(guān)聯(lián)到我前面提到的其他線程

sendNotifications

當(dāng)服務(wù)節(jié)點(diǎn)剛啟動(dòng)或者接受到別的節(jié)點(diǎn)發(fā)送來的r_vote來更新自己的proposal的時(shí)候都需要通過sendNotification方法將自己推薦的Leader信息發(fā)送給別的participant氛悬,我們分析下sendNotifications的源碼

 private void sendNotifications() {
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
            //把節(jié)點(diǎn)proposal的Leader信息封裝成ToSend對(duì)象然后加入到sendqueue中
            ToSend notmsg = new ToSend(
                ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch,
                qv.toString().getBytes());

            LOG.debug(
                "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
                    + " {} (myid), 0x{} (n.peerEpoch) ",
                proposedLeader,
                Long.toHexString(proposedZxid),
                Long.toHexString(logicalclock.get()),
                sid,
                self.getId(),
                Long.toHexString(proposedEpoch));

            sendqueue.offer(notmsg);
        }
    }
WorkerSender.run

我看看下消費(fèi)sendqueue隊(duì)列的WorkerSend線程的run方法

public void run() {
                while (!stop) {
                    try {
                        //從sendqueue取出ToSend消息然后交給process處理
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        if (m == null) {
                            continue;
                        }

                        process(m);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }
WorkerSender.process
 void process(ToSend m) {
                //把toSend轉(zhuǎn)化成ByteBuffer對(duì)象
                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
              //通過QuorumCnxManager把requestBuffer發(fā)送給指定的participant
                manager.toSend(m.sid, requestBuffer);

            }
QuorumCnxManager.toSend

我們分析下participant連接管理器toSend方法發(fā)生了什么

 public void toSend(Long sid, ByteBuffer b) {                                                                                           
     /*                                                                                                                                 
      * If sending message to myself, then simply enqueue it (loopback).                                                                
      */ 
    //如果投票信息是發(fā)送給自己的那么直接放入recvQueue中                                                                                                                               
     if (this.mySid == sid) {                                                                                                           
         b.position(0);                                                                                                                 
         addToRecvQueue(new Message(b.duplicate(), sid));                                                                               
         /*                                                                                                                             
          * Otherwise send to the corresponding thread to send.                                                                         
          */                                                                                                                            
     } else {                                                                                                                           
         /*                                                                                                                             
          * Start a new connection if doesn't have one already.                                                                         
          */  
        //queueSendMap:ConcurrentHashMap類型,是本節(jié)點(diǎn)保存發(fā)送消息到其他節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)                                                                                                                     
         BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid, serverId -> new CircularBlockingQueue<>(SEND_CAPACITY));      
         //把本次發(fā)送給sid所代表的的節(jié)點(diǎn)投票信息保存到blockingQueue中
         addToSendQueue(bq, b);       
        //建立本節(jié)點(diǎn)到sid節(jié)點(diǎn)的socket連接                                                                                                  
         connectOne(sid);                                                                                                               
     }                                                                                                                                  
 }                                                                                                                                      
Tpis

在講解connectOne方法之前我們先講解下zookeeper投票節(jié)點(diǎn)直接的網(wǎng)絡(luò)連接拓?fù)洌?br> 下圖描述的是三個(gè)節(jié)點(diǎn)建立的網(wǎng)絡(luò)連接拓?fù)涫疽鈭D


connection_topology.png

每個(gè)節(jié)點(diǎn)都會(huì)和別的節(jié)點(diǎn)建立連接耘柱,zookeeper對(duì)于連接上的輸入和輸出投票消息分別使用SendWorker和ReceiveWorker來處理如捅,他們都是線程類。因?yàn)槿我鈨蓚€(gè)節(jié)點(diǎn)之間都需要建立連接调煎,為什么防止高效穩(wěn)定的無浪費(fèi)的建立起這些連接镜遣,zookeeper對(duì)于連接的建立創(chuàng)建了如下的一個(gè)約束:
值允許sid較大的機(jī)器去主動(dòng)建立到sid較小的機(jī)器:舉個(gè)?? : sid為1 和sid為2的兩個(gè)機(jī)器建立網(wǎng)絡(luò)連接
如果sid=1的服務(wù)器主動(dòng)發(fā)起向sid=2的服務(wù)器socket連接建立,該連接是無法建立起來的士袄,底層socket建立之后悲关,zookeeper會(huì)檢查本機(jī)的sid和遠(yuǎn)程連接服務(wù)器的sid谎僻,如果發(fā)現(xiàn)自己的sid比較小那么會(huì)主動(dòng)關(guān)閉socket連接。如果sid=2的服務(wù)器建立到sid=1的服務(wù)器socket連接寓辱,那么可以建立成功

QuorumCnxManager.connectOne

connectOne方法就是完成建立我們上面連接拓?fù)鋱D示意的結(jié)果

 synchronized void connectOne(long sid) {  
      //senderWorkerMap用來存放每個(gè)sid對(duì)應(yīng)的SendWorker                                                                        
     if (senderWorkerMap.get(sid) != null) {   
        //如果sid對(duì)應(yīng)的SendWorker已經(jīng)存在(做一下多地址的檢查)那么直接返回                                                                       
         LOG.debug("There is a connection already for server {}", sid);                                                  
         if (self.isMultiAddressEnabled() && self.isMultiAddressReachabilityCheckEnabled()) {                            
             // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the  
             // one we are using is already dead and we need to clean-up, so when we will create a new connection        
             // then we will choose an other one, which is actually reachable                                            
             senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();                                           
         }                                                                                                               
         return;                                                                                                         
     }                                                                                                                   
     synchronized (self.QV_LOCK) {                                                                                       
         boolean knownId = false;                                                                                        
         // Resolve hostname for the remote server before attempting to                                                  
         // connect in case the underlying ip address has changed.                                                       
         self.recreateSocketAddresses(sid);                                                                              
         Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();                                          
         QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();                                                   
         Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();                               
         if (lastCommittedView.containsKey(sid)) {                                                                       
             knownId = true;                                                                                             
             LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);   
             //如果本節(jié)點(diǎn)到sid對(duì)應(yīng)的服務(wù)器還沒有建立socket連接艘绍,那么通過connectOne建立連接       
             if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {                                             
                 return;                                                                                                 
             }                                                                                                           
         }                                                                                                               
         if (lastSeenQV != null                                                                                          
             && lastProposedView.containsKey(sid)                                                                        
             && (!knownId                                                                                                
                 || (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) {              
             knownId = true;                                                                                             
             LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);                  
                                                                                                                         
             if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {                                              
                 return;                                                                                                 
             }                                                                                                           
         }                                                                                                               
         if (!knownId) {                                                                                                 
             LOG.warn("Invalid server id: {} ", sid);                                                                    
         }                                                                                                               
     }                                                                                                                   
 }                                                                                                                       

上面的connectOne(sid,address)會(huì)繼續(xù)調(diào)用initiateConnectionAsync()方法,

QuorumCnxManager.initiateConnectionAsync

initiateConnectionAsync方法就是把建立連接的任務(wù)封存成QuorumConnectionReqThread然后異步完成

public boolean initiateConnectionAsync(final MultipleAddresses electionAddr, final Long sid) {                          
    if (!inprogressConnections.add(sid)) {                                                                              
        // simply return as there is a connection request to                                                            
        // server 'sid' already in progress.                                                                            
        LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", sid);         
        return true;                                                                                                    
    }                                                                                                                   
    try {                                                                                                               
        connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));                                   
        connectionThreadCnt.incrementAndGet();                                                                          
    } catch (Throwable e) {                                                                                             
        // Imp: Safer side catching all type of exceptions and remove 'sid'                                             
        // from inprogress connections. This is to avoid blocking further                                               
        // connection requests from this 'sid' in case of errors.                                                       
        inprogressConnections.remove(sid);                                                                              
        LOG.error("Exception while submitting quorum connection request", e);                                           
        return false;                                                                                                   
    }                                                                                                                   
    return true;                                                                                                        
}                                                                                                                       
QuorumConnectionReqThread

這是一個(gè)線程類主要負(fù)責(zé)完成到指定服務(wù)器的socket連接
我們看下它的run方法調(diào)用的initiateConnection的實(shí)現(xiàn)

 public void initiateConnection(final MultipleAddresses electionAddr, final Long sid) {               
     Socket sock = null;                                                                              
     try {                                                                                            
         LOG.debug("Opening channel to server {}", sid);                                              
         if (self.isSslQuorum()) {                                                                    
             sock = self.getX509Util().createSSLSocket();                                             
         } else {    
            //通過工廠方式創(chuàng)建socket                                                                                 
             sock = SOCKET_FACTORY.get();                                                             
         }                                                                                            
         setSockOpts(sock);    
        //建立到遠(yuǎn)程服務(wù)器的連接                                                                       
         sock.connect(electionAddr.getReachableOrOne(), cnxTO);                                       
         if (sock instanceof SSLSocket) {                                                             
             SSLSocket sslSock = (SSLSocket) sock;                                                    
             sslSock.startHandshake();                                                                
             LOG.info("SSL handshake complete with {} - {} - {}",                                     
                      sslSock.getRemoteSocketAddress(),                                               
                      sslSock.getSession().getProtocol(),                                             
                      sslSock.getSession().getCipherSuite());                                         
         }                                                                                            
                                                                                                      
         LOG.debug("Connected to server {} using election address: {}:{}",                            
                   sid, sock.getInetAddress(), sock.getPort());                                       
     } catch (X509Exception e) {                                                                      
         LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);   
         closeSocket(sock);                                                                           
         return;                                                                                      
     } catch (UnresolvedAddressException | IOException e) {                                           
         LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);          
         closeSocket(sock);                                                                           
         return;                                                                                      
     }                                                                                                
                                                                                                      
     try {    
        //這個(gè)方法我們?cè)谙旅娣治鱿?                                                                                       
         startConnection(sock, sid);                                                                  
     } catch (IOException e) {                                                                        
         LOG.error(                                                                                   
           "Exception while connecting, id: {}, addr: {}, closing learner connection",                
           sid,                                                                                       
           sock.getRemoteSocketAddress(),                                                             
           e);                                                                                        
         closeSocket(sock);                                                                           
     }                                                                                                
 }                                                                                                    

QuorumConnectionReqThread.startConnection

startConnection完成了上面提到的連接建立的約束條件檢查讶舰,創(chuàng)建對(duì)應(yīng)的SendWorker和ReceiveWorker線程對(duì)象

 private boolean startConnection(Socket sock, Long sid) throws IOException {      
     //數(shù)據(jù)輸出流                            
     DataOutputStream dout = null;       
     //數(shù)據(jù)輸入流                                                                     
     DataInputStream din = null;                                                                              
     LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid);                                    
     try {                                                                                                    
         // Use BufferedOutputStream to reduce the number of IP packets. This is                              
         // important for x-DC scenarios.     
           //封裝數(shù)據(jù)輸出流                                                                
         BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());                         
         dout = new DataOutputStream(buf);                                                                    
                                                                                                              
         // Sending id and challenge                                                                          
                                                                                                              
         // First sending the protocol version (in other words - message type).                               
         // For backward compatibility reasons we stick to the old protocol version, unless the MultiAddress  
         // feature is enabled. During rolling upgrade, we must make sure that all the servers can            
         // understand the protocol version we use to avoid multiple partitions. see ZOOKEEPER-3720     
        //下面是建立到別的服務(wù)節(jié)點(diǎn)會(huì)話發(fā)送的一些基礎(chǔ)數(shù)據(jù)      
         long protocolVersion = self.isMultiAddressEnabled() ? PROTOCOL_VERSION_V2 : PROTOCOL_VERSION_V1;     
         //發(fā)送版本號(hào)
         dout.writeLong(protocolVersion);        
         //發(fā)送本機(jī)的sid                                                             
         dout.writeLong(self.getId());                                                                        
                                                                                                              
         // now we send our election address. For the new protocol version, we can send multiple addresses.   
         Collection<InetSocketAddress> addressesToSend = protocolVersion == PROTOCOL_VERSION_V2               
                 ? self.getElectionAddress().getAllAddresses()                                                
                 : Arrays.asList(self.getElectionAddress().getOne());                                         
                                                                                                              
         String addr = addressesToSend.stream()                                                               
                 .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));                             
         byte[] addr_bytes = addr.getBytes();                                                                 
         dout.writeInt(addr_bytes.length);                                                                    
         dout.write(addr_bytes);                                                                              
         dout.flush();                                                                                        
          //創(chuàng)建數(shù)據(jù)輸入流                                                                                                    
         din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));                           
     } catch (IOException e) {                                                                                
         LOG.warn("Ignoring exception reading or writing challenge: ", e);                                    
         closeSocket(sock);                                                                                   
         return false;                                                                                        
     }                                                                                                        
                                                                                                              
     // authenticate learner                                                                                  
     QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);                                             
     if (qps != null) {                                                                                       
         // TODO - investigate why reconfig makes qps null.      
        //如果有配置了服務(wù)器認(rèn)證鞍盗,那么對(duì)遠(yuǎn)端的服務(wù)器做認(rèn)證                                             
         authLearner.authenticate(sock, qps.hostname);                                                        
     }                                                                                                        
                                                                                                              
     // If lost the challenge, then drop the new connection                                                   
     if (sid > self.getId()) {       
         //這個(gè)地方就是上面提到的 建立socket連接的約束條件檢查點(diǎn)                                                                         
         LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.get
 //如果sid>self.sid那么關(guān)閉socket連接        
 closeSocket(sock);                                                                                   
         // Otherwise proceed with the connection                                                             
     } else {                                                                                                 
         LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getI
         //根據(jù)sid和建立的socket建立SendWorker
         SendWorker sw = new SendWorker(sock, sid);      
        //根據(jù)sid,socket和輸入信息流建立RecvWorker                                                    
         RecvWorker rw = new RecvWorker(sock, din, sid, sw);        
         //SendWorker持有RecvWorker的引用                                    
         sw.setRecv(rw);                                                                                      
                                                                                                              
         SendWorker vsw = senderWorkerMap.get(sid);                                                           
                                                                                                              
         if (vsw != null) {                                                                                   
             vsw.finish();                                                                                    
         }                                                                                                    
           
        //把SendWorker加入到 senderWorkerMap中                                                                                           
         senderWorkerMap.put(sid, sw);                                                                        
           //queueSendMap初始化sid對(duì)應(yīng)的數(shù)據(jù)發(fā)送隊(duì)列                                                                                                   
         queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));                           
         //分別啟動(dòng)SendWorker和ReceiveWorker                                                                                                     
         sw.start();                                                                                          
         rw.start();                                                                                          
                                                                                                              
         return true;                                                                                         
                                                                                                   
     }                                                                                                        
     return false;                                                                                            
 }                                                                                                            
                                                                                                              
SendWorker

我們分析下SendWorker是如何工作的

 public void run() {                                                        
      threadCnt.incrementAndGet();                                           
      try {                                                                  
          /**                                                                
           * If there is nothing in the queue to send, then we               
           * send the lastMessage to ensure that the last message            
           * was received by the peer. The message could be dropped          
           * in case self or the peer shutdown their connection              
           * (and exit the thread) prior to reading/processing               
           * the last message. Duplicate messages are handled correctly      
           * by the peer.                                                    
           *                                                                 
           * If the send queue is non-empty, then we have a recent           
           * message than that stored in lastMessage. To avoid sending       
           * stale message, we should send the message in the send queue.    
           */ 
          //從queueSendMap根據(jù)sid獲取本SendWorker對(duì)應(yīng)的消息發(fā)送隊(duì)列                                                               
          BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);              
                if (bq == null || isSendQueueEmpty(bq)) {
                   //在第一次運(yùn)行的時(shí)候如果發(fā)現(xiàn)bq是null或者bq是空那么直接把lastMessageSent中存儲(chǔ)的信息發(fā)送出去,當(dāng)然前提是lastMessageSent中有數(shù)據(jù)跳昼,
                  //SendWorker每次都會(huì)把最近發(fā)送的數(shù)據(jù)存放在lastMessageSent中
                    ByteBuffer b = lastMessageSent.get(sid);
                    if (b != null) {
                        LOG.debug("Attempting to send lastMessage to sid={}", sid);
                        send(b);
                    }
                }
            } catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }
            LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);

            try {
              //這里才是主循環(huán)般甲,會(huì)一直嘗試從自己的投票消息隊(duì)列中獲取投票消息然后發(fā)送出去
                while (running && !shutdown && sock != null) {

                    ByteBuffer b = null;
                    try {
                        BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                        if (bq != null) {
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for server {}", sid);
                            break;
                        }

                        if (b != null) {
                          //把最新的投票消息存儲(chǔ)到lastMessageSent中
                            lastMessageSent.put(sid, b);
                           //通過底層socket把消息發(fā)送出去
                            send(b);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue", e);
                    }
                }
            } catch (Exception e) {
                LOG.warn(
                    "Exception when using channel: for id {} my id = {}",
                    sid ,
                    QuorumCnxManager.this.mySid,
                    e);
            }
            this.finish();

            LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId());
        }


ReceiveWorker

分析完SendWorker的run方法,我們分析下ReceiveWorker的run方法

 public void run() {
            threadCnt.incrementAndGet();
            try {
                LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
                //下面是循環(huán)從數(shù)據(jù)流中讀取消息
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
                    //在傳遞投票消息時(shí)鹅颊,zookeeper采用變長消息格式敷存,所以每次先讀取消息的長度
                    int length = din.readInt();
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException("Received packet with invalid packet: " + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
                    final byte[] msgArray = new byte[length];
                    //根據(jù)消息的長度讀取整個(gè)消息體的數(shù)據(jù)
                    din.readFully(msgArray, 0, length);
                   //把讀取的到的消息封裝成message然后放入到RecvQueue中,等待處理
                    addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
                }
            } catch (Exception e) {
                LOG.warn(
                    "Connection broken for id {}, my id = {}",
                    sid,
                    QuorumCnxManager.this.mySid,
                    e);
            } finally {
                LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
                sw.finish();
                closeSocket(sock);
            }
        }

    }
WorkerReceiver

通過上面的分析堪伍,我們可以看到投票消息會(huì)通過ReceiveWorker讀取封裝之后放入到RecvQueue中锚烦,那么接下來就是看下WorkerReceiver是如何消費(fèi)RecvQueue中的數(shù)據(jù)了,我們分析下WorkerReceiver的run方法,
這個(gè)方法很長帝雇,請(qǐng)耐心看完


  public void run() {

                Message response;
                //主循環(huán)
                while (!stop) {
                    // Sleeps on receive
                    try {
                        //從RecvQueue中嘗試獲取投票信息
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        if (response == null) {
                            //如果為空那么 繼續(xù)
                            continue;
                        }
                       //根據(jù)消息的大小會(huì)做下面一系列的合法性驗(yàn)證
                        final int capacity = response.buffer.capacity();

                        // The current protocol and two previous generations all send at least 28 bytes
                        if (capacity < 28) {
                            LOG.error("Got a short response from server {}: {}", response.sid, capacity);
                            continue;
                        }

                        // this is the backwardCompatibility mode in place before ZK-107
                        // It is for a version of the protocol in which we didn't send peer epoch
                        // With peer epoch and version the message became 40 bytes
                        boolean backCompatibility28 = (capacity == 28);

                        // this is the backwardCompatibility mode for no version information
                        boolean backCompatibility40 = (capacity == 40);

                        response.buffer.clear();

                        // Instantiate Notification and set its attributes
                        Notification n = new Notification();
                         //從消息中抽取信息涮俄,用這些信息來生成notification
                        int rstate = response.buffer.getInt();
                        long rleader = response.buffer.getLong();
                        long rzxid = response.buffer.getLong();
                        long relectionEpoch = response.buffer.getLong();
                        long rpeerepoch;

                        int version = 0x0;
                        QuorumVerifier rqv = null;

                        try {
                            if (!backCompatibility28) {
                                rpeerepoch = response.buffer.getLong();
                                if (!backCompatibility40) {
                                    /*
                                     * Version added in 3.4.6
                                     */

                                    version = response.buffer.getInt();
                                } else {
                                    LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
                                }
                            } else {
                                LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
                                rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
                            }

                            // check if we have a version that includes config. If so extract config info from message.
                            if (version > 0x1) {
                                int configLength = response.buffer.getInt();

                                // we want to avoid errors caused by the allocation of a byte array with negative length
                                // (causing NegativeArraySizeException) or huge length (causing e.g. OutOfMemoryError)
                                if (configLength < 0 || configLength > capacity) {
                                    throw new IOException(String.format("Invalid configLength in notification message! sid=%d, capacity=%d, version=%d, configLength=%d",
                                                                        response.sid, capacity, version, configLength));
                                }

                                byte[] b = new byte[configLength];
                               //獲取config的數(shù)據(jù)
                                response.buffer.get(b);

                                synchronized (self) {
                                    try {
                                         //根據(jù)config來生成QuorumVerifier
                                        rqv = self.configFromString(new String(b));
                                        QuorumVerifier curQV = self.getQuorumVerifier();
                                        if (rqv.getVersion() > curQV.getVersion()) {
                                            LOG.info("{} Received version: {} my version: {}",
                                                     self.getId(),
                                                     Long.toHexString(rqv.getVersion()),
                                                     Long.toHexString(self.getQuorumVerifier().getVersion()));
                                            if (self.getPeerState() == ServerState.LOOKING) {
                                                LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
                                                self.processReconfig(rqv, null, null, false);
                                                if (!rqv.equals(curQV)) {
                                                    LOG.info("restarting leader election");
                                                    self.shuttingDownLE = true;
                                                    self.getElectionAlg().shutdown();

                                                    break;
                                                }
                                            } else {
                                                LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
                                            }
                                        }
                                    } catch (IOException | ConfigException e) {
                                        LOG.error("Something went wrong while processing config received from {}", response.sid);
                                    }
                                }
                            } else {
                                LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
                            }
                        } catch (BufferUnderflowException | IOException e) {
                            LOG.warn("Skipping the processing of a partial / malformed response message sent by sid={} (message length: {})",
                                     response.sid, capacity, e);
                            continue;
                        }
                        /*
                         * If it is from a non-voting server (such as an observer or
                         * a non-voting follower), respond right away.
                         */
                          //如果發(fā)送的投票信息的服務(wù)器sid不是合法的投票者,那么直接恢復(fù)信息
                        if (!validVoter(response.sid)) {
                            Vote current = self.getCurrentVote();
                            QuorumVerifier qv = self.getQuorumVerifier();
                            ToSend notmsg = new ToSend(
                                ToSend.mType.notification,
                                current.getId(),
                                current.getZxid(),
                                logicalclock.get(),
                                self.getPeerState(),
                                response.sid,
                                current.getPeerEpoch(),
                                qv.toString().getBytes());

                            sendqueue.offer(notmsg);
                        } else {
                            // Receive new message
                            LOG.debug("Receive new notification message. My id = {}", self.getId());

                            // State of peer that sent this message
                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                            switch (rstate) {
                            case 0:
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            case 1:
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            case 2:
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            case 3:
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            default:
                                continue;
                            }
                            //使用Message中抽取出來的數(shù)據(jù)給Notification屬性賦值
                            n.leader = rleader;
                            n.zxid = rzxid;
                            n.electionEpoch = relectionEpoch;
                            n.state = ackstate;
                            n.sid = response.sid;
                            n.peerEpoch = rpeerepoch;
                            n.version = version;
                            n.qv = rqv;
                            /*
                             * Print notification info
                             */
                            LOG.info(
                                "Notification: my state:{}; n.sid:{}, n.state:{}, n.leader:{}, n.round:0x{}, "
                                    + "n.peerEpoch:0x{}, n.zxid:0x{}, message format version:0x{}, n.config version:0x{}",
                                self.getPeerState(),
                                n.sid,
                                n.state,
                                n.leader,
                                Long.toHexString(n.electionEpoch),
                                Long.toHexString(n.peerEpoch),
                                Long.toHexString(n.zxid),
                                Long.toHexString(n.version),
                                (n.qv != null ? (Long.toHexString(n.qv.getVersion())) : "0"));

                            /*
                             * If this server is looking, then send proposed leader
                             */
                            //如果本節(jié)點(diǎn)是在Looking狀態(tài)尸闸,那么把生成的Notification加入到recvqueue中
                            if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
                                recvqueue.offer(n);

                                /*
                                 * Send a notification back if the peer that sent this
                                 * message is also looking and its logical clock is
                                 * lagging behind.
                                 */
                                if ((ackstate == QuorumPeer.ServerState.LOOKING)
                                    && (n.electionEpoch < logicalclock.get())) {
                                   //如果接受到sid的投票信息的輪次小于本機(jī)進(jìn)行的投票輪次彻亲,那么把本機(jī)的vote信息發(fā)送給對(duì)應(yīng)的sid
                                    Vote v = getVote();
                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(
                                        ToSend.mType.notification,
                                        v.getId(),
                                        v.getZxid(),
                                        logicalclock.get(),
                                        self.getPeerState(),
                                        response.sid,
                                        v.getPeerEpoch(),
                                        qv.toString().getBytes());
                                    sendqueue.offer(notmsg);
                                }
                            } else {
                                /*
                                 * If this server is not looking, but the one that sent the ack
                                 * is looking, then send back what it believes to be the leader.
                                 */
                                //如果本機(jī)沒有處在Looking的狀態(tài),也就是說主節(jié)點(diǎn)已經(jīng)選舉出來了吮廉,那么
                                Vote current = self.getCurrentVote();
                                if (ackstate == QuorumPeer.ServerState.LOOKING) {
                                    //下面是判斷Leader節(jié)點(diǎn)的合法性
                                    if (self.leader != null) {
                                        if (leadingVoteSet != null) {
                                            self.leader.setLeadingVoteSet(leadingVoteSet);
                                            leadingVoteSet = null;
                                        }
                                        self.leader.reportLookingSid(response.sid);
                                    }


                                    LOG.debug(
                                        "Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
                                        self.getId(),
                                        response.sid,
                                        Long.toHexString(current.getZxid()),
                                        current.getId(),
                                        Long.toHexString(self.getQuorumVerifier().getVersion()));
                                     //把主節(jié)點(diǎn)信息發(fā)送給對(duì)應(yīng)的sid服務(wù)器
                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(
                                        ToSend.mType.notification,
                                        current.getId(),
                                        current.getZxid(),
                                        current.getElectionEpoch(),
                                        self.getPeerState(),
                                        response.sid,
                                        current.getPeerEpoch(),
                                        qv.toString().getBytes());
                                    sendqueue.offer(notmsg);
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted Exception while waiting for new message", e);
                    }
                }
                LOG.info("WorkerReceiver is down");
            }

        }

上面就是WorkerReceiver的工作流程苞尝,WorkerReceiver會(huì)把投票信息處理之后形成Notification加入到recevqueue中,QuorumPeer會(huì)從recevqueue去獲取notification處理宦芦,這個(gè)處理邏輯在上面 我們已經(jīng)分析過了宙址。

End

自此我們完成了zookeeper主節(jié)點(diǎn)選舉流程的源碼分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市调卑,隨后出現(xiàn)的幾起案子抡砂,更是在濱河造成了極大的恐慌,老刑警劉巖恬涧,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件舀患,死亡現(xiàn)場離奇詭異,居然都是意外死亡气破,警方通過查閱死者的電腦和手機(jī)聊浅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人低匙,你說我怎么就攤上這事旷痕。” “怎么了顽冶?”我有些...
    開封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵欺抗,是天一觀的道長。 經(jīng)常有香客問我强重,道長绞呈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任间景,我火速辦了婚禮佃声,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘倘要。我一直安慰自己圾亏,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開白布封拧。 她就那樣靜靜地躺著志鹃,像睡著了一般。 火紅的嫁衣襯著肌膚如雪泽西。 梳的紋絲不亂的頭發(fā)上曹铃,一...
    開封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音捧杉,去河邊找鬼铛只。 笑死,一個(gè)胖子當(dāng)著我的面吹牛糠溜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播直撤,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼非竿,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了谋竖?” 一聲冷哼從身側(cè)響起红柱,我...
    開封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蓖乘,沒想到半個(gè)月后锤悄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嘉抒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年零聚,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡隶症,死狀恐怖政模,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蚂会,我是刑警寧澤淋样,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站胁住,受9級(jí)特大地震影響趁猴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜彪见,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一儡司、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧企巢,春花似錦枫慷、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至笋婿,卻和暖如春誉裆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背缸濒。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來泰國打工足丢, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人庇配。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓斩跌,卻偏偏與公主長得像,于是被迫代替她去往敵國和親捞慌。 傳聞我的和親對(duì)象是個(gè)殘疾皇子耀鸦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容