深入淺出Zookeeper源碼(七):Leader選舉

本文首發(fā)于泊浮目的簡(jiǎn)書(shū):http://www.reibang.com/u/204b8aaab8ba

版本 日期 備注
1.0 2020.6.14 文章首發(fā)
1.1 2020.8.16 排版優(yōu)化
1.2 2020.8.21 優(yōu)化措辭
1.3 2021.6.23 標(biāo)題從深入淺出Zookeeper(七):Leader選舉改變?yōu)?code>深入淺出Zookeeper源碼(七):Leader選舉

1. 前言

對(duì)于一個(gè)分布式集群來(lái)說(shuō),保證數(shù)據(jù)寫(xiě)入一致性最簡(jiǎn)單的方式就是依靠一個(gè)節(jié)點(diǎn)來(lái)調(diào)度和管理其他節(jié)點(diǎn)。在分布式系統(tǒng)中我們一般稱其為L(zhǎng)eader摊趾。

為什么是最簡(jiǎn)單的方式呢弄跌?我們想象一下磷账,當(dāng)我們寫(xiě)數(shù)據(jù)到Leader時(shí)缝裁,Leader寫(xiě)入自己的一份數(shù)據(jù)后,可能會(huì)做副本到Follower治拿,那么拷貝的數(shù)量摩泪、及所在的位置都由該Leader來(lái)控制。但如果是多Leader調(diào)度劫谅,就要涉及到數(shù)據(jù)分區(qū)见坑,請(qǐng)求負(fù)載均衡等問(wèn)題了。

今天捏检,筆者就和大家一起來(lái)看看ZK的選舉流程荞驴。

2. 選舉算法剖析-ZAB

這是一種典型的多數(shù)派算法,聽(tīng)名字就知道是為ZK而生了(Zookeeper Atomic Broadcast)未檩。其Leader的選舉主要關(guān)心節(jié)點(diǎn)的ID和數(shù)據(jù)ID戴尸,這其中數(shù)據(jù)ID越大,則表示數(shù)據(jù)越新冤狡,優(yōu)先成為主。

2.1 選舉時(shí)機(jī)

常見(jiàn)由兩種場(chǎng)景觸發(fā)選舉项棠,無(wú)論如何悲雳,至少得有兩臺(tái)ZK機(jī)器。

  • Startup觸發(fā):我們知道香追,每臺(tái)zk都需要配置不同的myid合瓢,而當(dāng)剛開(kāi)始時(shí),zxid必定都為0透典。這便意味著會(huì)挑選myid最大的zk節(jié)點(diǎn)作為leader晴楔。
  • Leader失聯(lián)觸發(fā):zk節(jié)點(diǎn)每經(jīng)過(guò)一次事務(wù)處理顿苇,都會(huì)更新zxid。那便意味著數(shù)據(jù)越新税弃,zxid會(huì)越大纪岁。在這個(gè)選舉過(guò)程中,會(huì)挑選出zxid的節(jié)點(diǎn)作為leader则果。

2.2 Zk選舉過(guò)程剖析(帶源碼分析)

核心方法為org.apache.zookeeper.server.quorum.QuorumPeer.startLeaderElectionorg.apache.zookeeper.server.quorum.QuorumPeer.run幔翰,我們的源碼分析也基于此展開(kāi)。

下面的源碼分析基于3.5.7版本西壮。

2.2.1 Startup

我們得從QuorumPeerMain來(lái)看遗增,因?yàn)檫@是啟動(dòng)的入口:

/**
 *
 * <h2>Configuration file</h2>
 *
 * When the main() method of this class is used to start the program, the first
 * argument is used as a path to the config file, which will be used to obtain
 * configuration information. This file is a Properties file, so keys and
 * values are separated by equals (=) and the key/value pairs are separated
 * by new lines. The following is a general summary of keys used in the
 * configuration file. For full details on this see the documentation in
 * docs/index.html
 * <ol>
 * <li>dataDir - The directory where the ZooKeeper data is stored.</li>
 * <li>dataLogDir - The directory where the ZooKeeper transaction log is stored.</li>
 * <li>clientPort - The port used to communicate with clients.</li>
 * <li>tickTime - The duration of a tick in milliseconds. This is the basic
 * unit of time in ZooKeeper.</li>
 * <li>initLimit - The maximum number of ticks that a follower will wait to
 * initially synchronize with a leader.</li>
 * <li>syncLimit - The maximum number of ticks that a follower will wait for a
 * message (including heartbeats) from the leader.</li>
 * <li>server.<i>id</i> - This is the host:port[:port] that the server with the
 * given id will use for the quorum protocol.</li>
 * </ol>
 * In addition to the config file. There is a file in the data directory called
 * "myid" that contains the server id as an ASCII decimal value.
 *
 */
@InterfaceAudience.Public
public class QuorumPeerMain {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);

    private static final String USAGE = "Usage: QuorumPeerMain configfile";

    protected QuorumPeer quorumPeer;

    /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (DatadirException e) {
            LOG.error("Unable to access datadir, exiting abnormally", e);
            System.err.println("Unable to access datadir, exiting abnormally");
            System.exit(3);
        } catch (AdminServerException e) {
            LOG.error("Unable to start AdminServer, exiting abnormally", e);
            System.err.println("Unable to start AdminServer, exiting abnormally");
            System.exit(4);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }

    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

    public void runFromConfig(QuorumPeerConfig config)
            throws IOException, AdminServerException
    {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = null;
          ServerCnxnFactory secureCnxnFactory = null;

          if (config.getClientPortAddress() != null) {
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(),
                      false);
          }

          if (config.getSecureClientPortAddress() != null) {
              secureCnxnFactory = ServerCnxnFactory.createFactory();
              secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                      config.getMaxClientCnxns(),
                      true);
          }

          quorumPeer = getQuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
          quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
          //quorumPeer.setQuorumPeers(config.getAllMembers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setConfigFileName(config.getConfigFilename());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
          if (config.getLastSeenQuorumVerifier()!=null) {
              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
          }
          quorumPeer.initConfigInZKDatabase();
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
          quorumPeer.setSslQuorum(config.isSslQuorum());
          quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          if (config.sslQuorumReloadCertFiles) {
              quorumPeer.getX509Util().enableCertFileReloading();
          }

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }
          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();
          
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

    // @VisibleForTesting
    protected QuorumPeer getQuorumPeer() throws SaslException {
        return new QuorumPeer();
    }
}

我們從QuorumPeerMain.main() -> main.initializeAndRun(args) -> runFromConfig -> quorumPeer.start(),繼續(xù)往下看QuorumPeer.java(這個(gè)類(lèi)用于管理選舉相關(guān)的邏輯):

    @Override
    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        loadDataBase();
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        startLeaderElection();
        super.start();
    }

現(xiàn)在款青,我們來(lái)到核心代碼startLeaderElection

    synchronized public void startLeaderElection() {
       try {
           if (getPeerState() == ServerState.LOOKING) {
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }

       // if (!getView().containsKey(myid)) {
      //      throw new RuntimeException("My id " + myid + " not in the peer list");
        //}
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(getQuorumAddress().getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }

邏輯非常的簡(jiǎn)單做修,如果處于Looking狀態(tài)(服務(wù)器剛啟動(dòng)時(shí)默認(rèn)為L(zhǎng)ooking),那么就發(fā)起選舉的投票抡草,并確認(rèn)選舉算法(從3.4.0開(kāi)始饰及,只有FastLeaderElection選舉算法了),并將其發(fā)送出去渠牲。由于代碼篇幅較大旋炒,這里不再粘出,感興趣的讀者可以自行閱讀FastLeaderElection.Messenger.WorkerReceiver.run签杈。其本質(zhì)上就是一個(gè)線程瘫镇,從存儲(chǔ)選票的隊(duì)列中取出vote,并發(fā)送答姥。

在這里普及一下服務(wù)器狀態(tài):

  1. LOOKING:尋找Leader狀態(tài)铣除。當(dāng)服務(wù)器處于該狀態(tài)時(shí),它認(rèn)為當(dāng)前集群中沒(méi)有Leader鹦付。
  2. FOLLOWING:跟隨者狀態(tài)尚粘,表明當(dāng)前服務(wù)器角色Follower。
  3. LEADING:領(lǐng)導(dǎo)者狀態(tài)敲长,表明當(dāng)前服務(wù)器角色是Leader郎嫁。
  4. OBSERVING:觀察者狀態(tài),表明當(dāng)前服務(wù)器是Observer祈噪。

接下來(lái)看QuorumPeer的相關(guān)核心代碼:

    @Override
    public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
        try {
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                        jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }

        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk =
                            new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
    
                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                           reconfigFlagClear();
                            if (shuttingDownLE) {
                               shuttingDownLE = false;
                               startLeaderElection();
                               }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }                        
                    }
                    break;

在這里僅僅截取了Looking的相關(guān)邏輯泽铛,上半段的if主要處理只讀服務(wù)——其用于handle只讀client。else邏輯則是常見(jiàn)的情況辑鲤,但是從代碼塊:

             reconfigFlagClear();
                            if (shuttingDownLE) {
                               shuttingDownLE = false;
                               startLeaderElection();
                               }
                            setCurrentVote(makeLEStrategy().lookForLeader());

其實(shí)區(qū)別不大盔腔。接著來(lái)看lookForLeader,為了篇幅,我們只截取Looking相關(guān)的代碼:

    /**
     * Starts a new round of leader election. Whenever our QuorumPeer
     * changes its state to LOOKING, this method is invoked, and it
     * sends notifications to all other peers.
     */
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                logicalclock.incrementAndGet();
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);


注釋說(shuō)的很清楚,這個(gè)方法會(huì)開(kāi)啟新的一輪選舉:當(dāng)我們的服務(wù)器狀態(tài)變?yōu)長(zhǎng)ooking弛随,這個(gè)方法會(huì)被調(diào)用瓢喉,被通知集群其他需要參與選舉的服務(wù)器。那么在這段邏輯中舀透,recvqueue會(huì)存放著相關(guān)的選舉通知信息栓票,取出一個(gè)。接下來(lái)有兩個(gè)邏輯分支:

  1. 為空盐杂。想辦法通知其他服務(wù)器逗载。
  2. 有效的投票(即大家的選舉輪次都是統(tǒng)一論次),那么便進(jìn)行選票P(pán)K链烈。

我們來(lái)看totalOrderPredicate這個(gè)方法:

    /**
     * Check if a pair (server id, zxid) succeeds our
     * current vote.
     *
     * @param id    Server identifier
     * @param zxid  Last zxid observed by the issuer of this vote
     */
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

理一下邏輯:

  1. 如果新的輪次大于內(nèi)部投票輪次厉斟,則需要進(jìn)行投票變更
  2. 如果選舉輪次一致,并外部投票的ZXID大于內(nèi)部投票的强衡,則需要變更
  3. 如果選舉輪次一致擦秽,并外部投票的SID大于內(nèi)部投票的漩勤,則需要變更

經(jīng)過(guò)這個(gè)邏輯,便可以確定外部投票優(yōu)于內(nèi)部投票——即更適合成為L(zhǎng)eader越败。這時(shí)便會(huì)把外部選票信息來(lái)覆蓋內(nèi)部投票,并發(fā)送出去:

                    case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();

接下來(lái)就會(huì)判斷集群中是否有過(guò)半的服務(wù)器認(rèn)可該投票置谦。

    /**
     * Termination predicate. Given a set of votes, determines if have
     * sufficient to declare the end of the election round.
     * 
     * @param votes
     *            Set of votes
     * @param vote
     *            Identifier of the vote received last
     */
    protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        /*
         * First make the views consistent. Sometimes peers will have different
         * zxids for a server depending on timing.
         */
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }

        return voteSet.hasAllQuorums(); //是否超過(guò)一半
    }

否則的話會(huì)繼續(xù)收集選票媒峡。

接下來(lái)便是更新服務(wù)器狀態(tài)葵擎。

                         /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(), 
                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }

2.2.2 Leader失聯(lián)

上文我們提到了QuorumPeer.java,里面有個(gè)main loop酬滤,不同的角色會(huì)在這個(gè)loop下做自己的事签餐。直到退出。在這里盯串,我們以Follower為例贱田,進(jìn)行分析:

                case FOLLOWING:
                    try {
                       LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;

follower.followLeader()

    /**
     * the main method called by the follower to follow the leader
     *
     * @throws InterruptedException
     */
    void followLeader() throws InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
                QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;
        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
        try {
            QuorumServer leaderServer = findLeader();            
            try {
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange())
                   throw new Exception("learned about role change");
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                try {
                    sock.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
    
                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            zk.unregisterJMX((Learner)this);
        }
    }

跳往核心方法processPacket

   /**
     * Examine the packet received in qp and dispatch based on its contents.
     * @param qp
     * @throws IOException
     */
    protected void processPacket(QuorumPacket qp) throws Exception{
        switch (qp.getType()) {
        case Leader.PING:            
            ping(qp);            
            break;
        case Leader.PROPOSAL:           
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            
            if (hdr.getType() == OpCode.reconfig){
               SetDataTxn setDataTxn = (SetDataTxn) txn;       
               QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
               self.setLastSeenQuorumVerifier(qv, true);                               
            }
            
            fzk.logRequest(hdr, txn);
            break;
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;
            
        case Leader.COMMITANDACTIVATE:
           // get the new configuration from the request
           Request request = fzk.pendingTxns.element();
           SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();                                                                                                      
           QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));                                
 
           // get new designated leader from (current) leader's message
           ByteBuffer buffer = ByteBuffer.wrap(qp.getData());    
           long suggestedLeaderId = buffer.getLong();
            boolean majorChange = 
                   self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
           // commit (writes the new config to ZK tree (/zookeeper/config)                     
           fzk.commit(qp.getZxid());
            if (majorChange) {
               throw new Exception("changes proposed in reconfig");
           }
           break;
        case Leader.UPTODATE:
            LOG.error("Received an UPTODATE message after Follower started");
            break;
        case Leader.REVALIDATE:
            revalidate(qp);
            break;
        case Leader.SYNC:
            fzk.sync();
            break;
        default:
            LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
            break;
        }
    }

case COMMITANDACTIVATE中,我們可以看到當(dāng)其收到leader改變相關(guān)的消息時(shí)嘴脾,就會(huì)拋出異常。接下來(lái)它自己就會(huì)變成LOOKING狀態(tài),開(kāi)始選舉译打。

那么如何確定leader不可用呢耗拓?答案是通過(guò)心跳指令。在一定時(shí)間內(nèi)如果leader的心跳沒(méi)有過(guò)來(lái)奏司,那么則認(rèn)為其已經(jīng)不可用乔询。

見(jiàn)LeanerHandler.run里的case Leader.PING

                case Leader.PING:
                    // Process the touches
                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
                            .getData());
                    DataInputStream dis = new DataInputStream(bis);
                    while (dis.available() > 0) {
                        long sess = dis.readLong();
                        int to = dis.readInt();
                        leader.zk.touch(sess, to);
                    }
                    break;

3. 其他常見(jiàn)選舉算法

首先,我們要知道韵洋。選舉算法的本質(zhì)是共識(shí)算法竿刁,而絕大多數(shù)共識(shí)算法就是為了解決分布式環(huán)境下數(shù)據(jù)一致性而誕生的。而zk里所謂leader搪缨、follower之類(lèi)的食拜,無(wú)非也是個(gè)狀態(tài),基于zk這個(gè)語(yǔ)義下(上下文里)大家都認(rèn)為一個(gè)leader是leader副编,才是有效的共識(shí)负甸。

常見(jiàn)的共識(shí)算法都有哪些呢?現(xiàn)階段的共識(shí)算法主要可以分成三大類(lèi):公鏈痹届,聯(lián)盟鏈和私鏈呻待。下面描述這三種類(lèi)別的特征:

  • 私鏈:私鏈的共識(shí)算法即區(qū)塊鏈這個(gè)概念還沒(méi)普及時(shí)的傳統(tǒng)分布式系統(tǒng)里的共識(shí)算法,比如 zookeeper 的 zab 協(xié)議,就是類(lèi) paxos 算法的一種。私鏈的適用環(huán)境一般是不考慮集群中存在作惡節(jié)點(diǎn)瑞妇,只考慮因?yàn)橄到y(tǒng)或者網(wǎng)絡(luò)原因?qū)е碌墓收瞎?jié)點(diǎn)。
  • 聯(lián)盟鏈:聯(lián)盟鏈中,經(jīng)典的代表項(xiàng)目是 Hyperledger 組織下的 Fabric 項(xiàng)目环疼, Fabric0.6 版本使用的就是 pbft 算法芬膝。聯(lián)盟鏈的適用環(huán)境除了需要考慮集群中存在故障節(jié)點(diǎn)厨剪,還需要考慮集群中存在作惡節(jié)點(diǎn)膨俐。對(duì)于聯(lián)盟鏈,每個(gè)新加入的節(jié)點(diǎn)都是需要驗(yàn)證和審核的。
  • 公鏈:公鏈不斷需要考慮網(wǎng)絡(luò)中存在故障節(jié)點(diǎn)浅役,還需要考慮作惡節(jié)點(diǎn)符欠,這一點(diǎn)和聯(lián)盟鏈?zhǔn)穷?lèi)似的端姚。和聯(lián)盟鏈最大的區(qū)別就是盆顾,公鏈中的節(jié)點(diǎn)可以很自由的加入或者退出奈懒,不需要嚴(yán)格的驗(yàn)證和審核磷杏。

引用自:https://zhuanlan.zhihu.com/p/35847127;作者:美圖技術(shù)團(tuán)隊(duì)

基于篇幅,接下來(lái)簡(jiǎn)單介紹下兩個(gè)較為典型的共識(shí)算法遥金。

3.1 Raft

Raft 算法是典型的多數(shù)派投票選舉算法稿械,其選舉機(jī)制與我們?nèi)粘I钪械拿裰魍镀睓C(jī)制類(lèi)似美莫,核心思想是“少數(shù)服從多數(shù)”。也就是說(shuō)述吸,Raft 算法中蝌矛,獲得投票最多的節(jié)點(diǎn)成為主入撒。

采用 Raft 算法選舉,集群節(jié)點(diǎn)的角色有 3 種:

  • Leader献雅,即主節(jié)點(diǎn)挺身,同一時(shí)刻只有一個(gè) Leader章钾,負(fù)責(zé)協(xié)調(diào)和管理其他節(jié)點(diǎn)惨撇;
  • Candidate魁衙,即候選者,每一個(gè)節(jié)點(diǎn)都可以成為 Candidate邪狞,節(jié)點(diǎn)在該角色下才可以被選為新的 Leader;
  • Follower剑令,Leader 的跟隨者吁津,不可以發(fā)起選舉。

Raft 選舉的流程稍算,可以分為以下幾步:

  1. 初始化時(shí)钾埂,所有節(jié)點(diǎn)均為 Follower 狀態(tài)褥紫。
  2. 開(kāi)始選主時(shí),所有節(jié)點(diǎn)的狀態(tài)由 Follower 轉(zhuǎn)化為 Candidate绳军,并向其他節(jié)點(diǎn)發(fā)送選舉請(qǐng)求。
  3. 其他節(jié)點(diǎn)根據(jù)接收到的選舉請(qǐng)求的先后順序射赛,回復(fù)是否同意成為主楣责。這里需要注意的是,在每一輪選舉中沮趣,一個(gè)節(jié)點(diǎn)只能投出一張票坷随。
  4. 若發(fā)起選舉請(qǐng)求的節(jié)點(diǎn)獲得超過(guò)一半的投票缸匪,則成為主節(jié)點(diǎn)凌蔬,其狀態(tài)轉(zhuǎn)化為 Leader砂心,其他節(jié)點(diǎn)的狀態(tài)則由 Candidate 降為 Follower计贰。Leader 節(jié)點(diǎn)與 Follower 節(jié)點(diǎn)之間會(huì)定期發(fā)送心跳包躁倒,以檢測(cè)主節(jié)點(diǎn)是否活著。
  5. 當(dāng) Leader 節(jié)點(diǎn)的任期到了衰抑,即發(fā)現(xiàn)其他服務(wù)器開(kāi)始下一輪選主周期時(shí)(或主節(jié)點(diǎn)掛了)砾淌,Leader 節(jié)點(diǎn)的狀態(tài)由 Leader 降級(jí)為 Follower赃春,進(jìn)入新一輪選主织中。

這個(gè)算法比起ZAB狭吼,較易實(shí)現(xiàn)刁笙,但由于消息通信量大,相比于ZAB,更適用于中小的場(chǎng)景嫡纠。

3.2 Pow

PoW 算法除盏,是以每個(gè)節(jié)點(diǎn)或服務(wù)器的計(jì)算能力(即“算力”)來(lái)競(jìng)爭(zhēng)記賬權(quán)的機(jī)制者蠕,因此是一種使用工作量證明機(jī)制的共識(shí)算法踱侣。也就是說(shuō),誰(shuí)的算力強(qiáng)(解題快)杠愧,誰(shuí)獲得記賬權(quán)的可能性就越大流济。

比如發(fā)生一次交易腌闯,同時(shí)有三個(gè)節(jié)點(diǎn)(A、B工腋、C)都收到了這個(gè)記賬請(qǐng)求擅腰。A節(jié)點(diǎn)已經(jīng)算出來(lái)了,那么就會(huì)通知BC節(jié)點(diǎn)進(jìn)行驗(yàn)證——這是一種橢圓曲線加密算法渗勘,解題的速度會(huì)比驗(yàn)證的速度慢很多旺坠。當(dāng)所有節(jié)點(diǎn)驗(yàn)證后,這個(gè)記賬就記下來(lái)了璧疗。

聽(tīng)起來(lái)很公平崩侠。但PoW 機(jī)制每次達(dá)成共識(shí)需要全網(wǎng)共同參與運(yùn)算,增加了每個(gè)節(jié)點(diǎn)的計(jì)算量僧家,并且如果題目過(guò)難,會(huì)導(dǎo)致計(jì)算時(shí)間長(zhǎng)、資源消耗多 ;而如果題目過(guò)于簡(jiǎn)單枷邪,會(huì)導(dǎo)致大量節(jié)點(diǎn)同時(shí)獲得記賬權(quán)诺凡,沖突多东揣。這些問(wèn)題,都會(huì)增加達(dá)成共識(shí)的時(shí)間腹泌。

4. 小結(jié)

在本文嘶卧,我們先提到了zookeeper的leader選舉,大致流程如下:

4.1 服務(wù)器啟動(dòng)時(shí)選舉

  1. 每個(gè)Server會(huì)發(fā)出一個(gè)投票
  2. 接受來(lái)每個(gè)Server的投票
  3. 處理投票(對(duì)比zxid和myid)
  4. 統(tǒng)計(jì)投票凉袱,直到超過(guò)半數(shù)的機(jī)器收到相同的投票信息
  5. 更改服務(wù)器角色

4.2 服務(wù)器運(yùn)行期間選舉

服務(wù)器啟動(dòng)時(shí)選舉非常的像芥吟,無(wú)非就是多了一個(gè)狀態(tài)變更——當(dāng)Leader掛了,余下的Follower都會(huì)將自己的服務(wù)器狀態(tài)變更為L(zhǎng)OOKING专甩,然后進(jìn)入選舉流程钟鸵。

4.3 一致性算法和共識(shí)算法

我們還提到了一致性算法和共識(shí)算法的概念,那么一致性與共識(shí)的區(qū)別是什么呢俐镐?在平常使用中,我們通常會(huì)混淆一致性和共識(shí)這兩個(gè)概念,不妨在這兒說(shuō)清:

  • 一致性:分布式系統(tǒng)中的多個(gè)節(jié)點(diǎn)之間,給定一系列的操作,在約定協(xié)議的保障下骡尽,對(duì)外界呈現(xiàn)的數(shù)據(jù)或狀態(tài)是一致的辨图。
  • 共識(shí):分布式系統(tǒng)中多個(gè)節(jié)點(diǎn)之間痘煤,彼此對(duì)某個(gè)狀態(tài)達(dá)成一致結(jié)果的過(guò)程师郑。

即:一致性強(qiáng)調(diào)的是結(jié)果地梨,共識(shí)強(qiáng)調(diào)的是達(dá)成一致的過(guò)程渴庆,共識(shí)算法是保障系統(tǒng)滿足不同程度一致性的核心技術(shù)卓缰。

因此摄闸,結(jié)合上篇文章和這篇文章竣稽,ZAB應(yīng)該是一種共識(shí)算法耍缴。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末徘熔,一起剝皮案震驚了整個(gè)濱河市蓉媳,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌劳景,老刑警劉巖具被,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件曾棕,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡囊拜,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)穴亏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)严肪,“玉大人充蓝,你說(shuō)我怎么就攤上這事陨帆。” “怎么了疲牵?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵承二,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我纲爸,道長(zhǎng)亥鸠,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任缩焦,我火速辦了婚禮读虏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘袁滥。我一直安慰自己盖桥,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布题翻。 她就那樣靜靜地躺著揩徊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪嵌赠。 梳的紋絲不亂的頭發(fā)上塑荒,一...
    開(kāi)封第一講書(shū)人閱讀 49,837評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音姜挺,去河邊找鬼齿税。 笑死,一個(gè)胖子當(dāng)著我的面吹牛炊豪,可吹牛的內(nèi)容都是我干的凌箕。 我是一名探鬼主播,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼词渤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼牵舱!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起缺虐,我...
    開(kāi)封第一講書(shū)人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤芜壁,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體慧妄,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡顷牌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了塞淹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片韧掩。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖窖铡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情坊谁,我是刑警寧澤费彼,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站口芍,受9級(jí)特大地震影響箍铲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鬓椭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一颠猴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧小染,春花似錦翘瓮、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至踊赠,卻和暖如春呵扛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背筐带。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工今穿, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人伦籍。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓蓝晒,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親鸽斟。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拔创,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349