zk選舉

說起zk,就會(huì)想到一個(gè)leader著洼,多個(gè)follower和observer這樣一種架構(gòu)樟遣,本文就是對(duì)zk選舉源碼的分析。本文分為兩部分身笤,第一部分是選舉流程豹悬,第二部分是選舉算法的核心邏輯:選舉流程一定要跟著這兩張圖看,選舉算法就是zk的選舉算法實(shí)現(xiàn)液荸。
首先看向這兩張選舉圖:


zk選舉圖

zk選舉圖2.png

第一張圖解釋:

  • QuorumPeer:負(fù)責(zé)選舉
  • Messenger:則是選舉消息發(fā)送和接收的具體類(注意在ZooKeeper中瞻佛,提供了3種Leader的選舉算法,分別是LeaderElection、 UDP版本的FastLeaderElection伤柄、TCP版本的FastLeaderElection绊困,從3.4.0版本開始,ZooKeeper廢棄了前2種算法适刀,只保留了TCP版本的FastLeaderElection算法秤朗,所以這里的Messenger是FastLeaderElection.Messenger)
  • WorkerReceiver和WorkerSender:Messenger中的接收和發(fā)送線程
  • QuorumCnxManager:選舉信息交換的Socket框架,采用Netty框架負(fù)責(zé)底層Socket鏈接管理笔喉,提供Select在多個(gè)Socket之間切換取视,先到先得處理選舉交換

第二張圖我們跟著代碼做更詳細(xì)的解釋:

如果以下內(nèi)容有不清楚,看向這兩張圖即可一目了然常挚。

選舉流程

每臺(tái)服務(wù)器都會(huì)啟動(dòng)一個(gè)QuorumPeer進(jìn)程作谭,QuorumPeer負(fù)責(zé)選舉整個(gè)過程,這是一個(gè)線程類待侵,是在QuorumPeerMain類中啟動(dòng):
代碼啟動(dòng)順序:
QuorumPeerMain#main->QuorumPeerMain#initializeAndRun->QuorumPeerMain#runFromConfig(集群)/ZooKeeperServerMain#main(單機(jī))
我們看向集群方法丢早,QuorumPeerMain#runFromConfig在加載各種配置后會(huì)啟動(dòng)QuorumPeer線程,我們直接看向QuorumPeer的run方法秧倾。

    //主線程怨酝,管理QuoRumPeer循環(huán)在FastLeadingElection,Leader那先,F(xiàn)ollower农猬,Observer之間切換
    @Override
    public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
        try {
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                        jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }

        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk =
                            new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
    
                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                //--初始化選舉過程中需要使用到的線程和隊(duì)列
                                startLeaderElection();
                            }
                            //--選舉邏輯入口
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                           reconfigFlagClear();
                            if (shuttingDownLE) {
                               shuttingDownLE = false;
                               startLeaderElection();
                               }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }                        
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        //6--啟動(dòng)Observer
                        //6--跟follower類似,注冊(cè)售淡,事務(wù)同步斤葱,調(diào)用processpacket
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );
                    } finally {
                        observer.shutdown();
                        setObserver(null);  
                       updateServerState();
                    }
                    break;
                case FOLLOWING://6--Follwer啟動(dòng)首先要了鏈接到leader,同步寫事務(wù)歷史記錄揖闸,然后才啟動(dòng)zookeeperServer提供服務(wù)給客戶端
                    try {
                       LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        //6--注冊(cè)到Leader connectToLeader->registerWithLeader->syncWithLeader
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;
                case LEADING://6--選舉完成后揍堕,Peer確認(rèn)自己是leader的身份,
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        //6--執(zhí)行l(wèi)eader真正的邏輯
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;
                }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            MBeanRegistry instance = MBeanRegistry.getInstance();
            instance.unregister(jmxQuorumBean);
            instance.unregister(jmxLocalPeerBean);

            for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
                instance.unregister(remotePeerBean);
            }

            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
            jmxRemotePeerBean = null;
        }
    }

QuorumPeer有4種工作模式汤纸,

  • looking:選舉模式衩茸,啟動(dòng)fastleaderElection
  • leading:領(lǐng)導(dǎo)者模式,啟動(dòng)leader
  • following:跟隨者模式贮泞,啟動(dòng)follower
  • observing:旁觀者模式楞慈,啟動(dòng)observer

我們先看向選舉模式調(diào)用方法順序:
QuorumPeer#startLeaderElection->QuorumPeer#createElectionAlgorithm
看向QuorumPeer#createElectionAlgorithm源碼:

    @SuppressWarnings("deprecation")
    protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            QuorumCnxManager qcm = createCnxnManager();
            QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
            if (oldQcm != null) {
                LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
                oldQcm.halt();
            }
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                //--初始化sendqueue和recvqueue兩個(gè)隊(duì)列
                //--初始化QuorumCnxManager
                //--初始化Messenger中WorkerSender線程和WorkerReceiver線程
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                //--啟動(dòng)Messenger中WorkerSender線程和WorkerReceiver線程
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

直接看向case 3:中的這段代碼:

                //--初始化sendqueue和recvqueue兩個(gè)隊(duì)列
                //--初始化QuorumCnxManager
                //--初始化Messenger中WorkerSender線程和WorkerReceiver線程
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                //--啟動(dòng)Messenger中WorkerSender線程和WorkerReceiver線程
                fle.start();

結(jié)合第二張圖來看這段代碼是非常清晰的,最后就是啟動(dòng)了Messenger中WorkerSender線程和WorkerReceiver線程.

FastLeaderElection.Messenger.WorkerSender#run

            public void run() {
                while (!stop) {
                    try {
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        if(m == null) continue;
                        //--調(diào)用tosend()
                        process(m);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

方法調(diào)用順序:
FastLeaderElection.Messenger.WorkerSender#process->QuorumCnxManager#toSend

    public void toSend(Long sid, ByteBuffer b) {
        /*
         * If sending message to myself, then simply enqueue it (loopback).
         */
        //--消息發(fā)送給我自己啃擦,放進(jìn)recvQueue
        if (this.mySid == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
            /*
             * Otherwise send to the corresponding thread to send.
             */
        } else {//--否則放進(jìn)queueSendMap
             /*
              * Start a new connection if doesn't have one already.
              */
             ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
                SEND_CAPACITY);
             ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
             //--放進(jìn)去queueSendMap
             if (oldq != null) {
                 addToSendQueue(oldq, b);
             } else {
                 addToSendQueue(bq, b);
             }
             connectOne(sid);

        }
    }

然后的方法調(diào)用順序:
QuorumCnxManager#connectOne->QuorumCnxManager#initiateConnection->QuorumCnxManager#startConnection

private boolean startConnection(Socket sock, Long sid)
            throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        try {
            // Use BufferedOutputStream to reduce the number of IP packets. This is
            // important for x-DC scenarios.
            BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
            dout = new DataOutputStream(buf);

            // Sending id and challenge
            // represents protocol version (in other words - message type)
            dout.writeLong(PROTOCOL_VERSION);
            dout.writeLong(self.getId());
            String addr = formatInetAddr(self.getElectionAddress());
            byte[] addr_bytes = addr.getBytes();
            dout.writeInt(addr_bytes.length);
            dout.write(addr_bytes);
            dout.flush();

            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));
        } catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", e);
            closeSocket(sock);
            return false;
        }

        // authenticate learner
        QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
        if (qps != null) {
            // TODO - investigate why reconfig makes qps null.
            authLearner.authenticate(sock, qps.hostname);
        }

        // If lost the challenge, then drop the new connection
        if (sid > self.getId()) {
            LOG.info("Have smaller server identifier, so dropping the " +
                    "connection: (" + sid + ", " + self.getId() + ")");
            closeSocket(sock);
            // Otherwise proceed with the connection
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);

            if(vsw != null)
                vsw.finish();

            senderWorkerMap.put(sid, sw);
            queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
                    SEND_CAPACITY));

            sw.start();
            rw.start();

            return true;

        }
        return false;
    }

所以直接看向QuorumCnxManager.SendWorker和QuorumCnxManager.RecvWorker的run方法囊蓝,這里才是真正發(fā)送消息和接收的地方,前面都是各路封裝令蛉。

QuorumCnxManager.SendWorker#run


        //--將queueSendMap中的消息發(fā)送出去聚霜。
        @Override
        public void run() {
            threadCnt.incrementAndGet();
            try {
                /**
                 * If there is nothing in the queue to send, then we
                 * send the lastMessage to ensure that the last message
                 * was received by the peer. The message could be dropped
                 * in case self or the peer shutdown their connection
                 * (and exit the thread) prior to reading/processing
                 * the last message. Duplicate messages are handled correctly
                 * by the peer.
                 *
                 * If the send queue is non-empty, then we have a recent
                 * message than that stored in lastMessage. To avoid sending
                 * stale message, we should send the message in the send queue.
                 */
                //5--獲取QueueSendMap,根據(jù)機(jī)器sid
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if (bq == null || isSendQueueEmpty(bq)) {
                    //5--最后一條發(fā)送的消息
                   ByteBuffer b = lastMessageSent.get(sid);
                   if (b != null) {
                       LOG.debug("Attempting to send lastMessage to sid=" + sid);
                       send(b);
                   }
                }
            } catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }

            try {
                while (running && !shutdown && sock != null) {

                    ByteBuffer b = null;
                    try {
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
                                .get(sid);
                        if (bq != null) {
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for " +
                                      "server " + sid);
                            break;
                        }

                        //--放入lasetMessageSent隊(duì)列
                        if(b != null){
                            lastMessageSent.put(sid, b);
                            send(b);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                                e);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Exception when using channel: for id " + sid
                         + " my id = " + QuorumCnxManager.this.mySid
                         + " error = " + e);
            }
            //--關(guān)閉連接
            this.finish();
            LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
        }
    }

其中send()

        synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity()];
            try {
                b.position(0);
                b.get(msgBytes);
            } catch (BufferUnderflowException be) {
                LOG.error("BufferUnderflowException ", be);
                return;
            }
            //--DataOutputStream-dout是Socket寫接口
            dout.writeInt(b.capacity());
            dout.write(b.array());
            dout.flush();
        }

再看向QuorumCnxManager.RecvWorker#run

        //--接受消息
        @Override
        public void run() {
            threadCnt.incrementAndGet();
            try {
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
                    //--DataInputStream-din,Socket接口
                    int length = din.readInt();
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException(
                                "Received packet with invalid packet: "
                                        + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
                    byte[] msgArray = new byte[length];
                    din.readFully(msgArray, 0, length);
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    //--添加到recvQueue隊(duì)列
                    //--刪除隊(duì)列頭部消息(如果內(nèi)存空間不夠)俯萎,添加到隊(duì)尾
                    addToRecvQueue(new Message(message.duplicate(), sid));
                }
            } catch (Exception e) {
                LOG.warn("Connection broken for id " + sid + ", my id = "
                         + QuorumCnxManager.this.mySid + ", error = " , e);
            } finally {
                LOG.warn("Interrupting SendWorker");
                sw.finish();
                closeSocket(sock);
            }
        }
    }

其中addToRecvQueue()

    //5--刪除頭部钞啸,保留尾部
    public void addToRecvQueue(Message msg) {
        synchronized(recvQLock) {
            if (recvQueue.remainingCapacity() == 0) {
                try {
                    recvQueue.remove();
                } catch (NoSuchElementException ne) {
                    // element could be removed by poll()
                     LOG.debug("Trying to remove from an empty " +
                         "recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                recvQueue.add(msg);
            } catch (IllegalStateException ie) {
                // This should never happen
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

接著再看向第二張圖架谎,這張圖真的很重要院塞。我們?cè)倏聪騀astLeaderElection钟鸵,這時(shí)我們看向FastLeaderElection.Messenger.WorkerReceiver#run

            //5--FastLeaderElection的WorkerReceive線程將來自QuorumCnxManager中recvQueue隊(duì)列的消息組裝成Notification放入FastLeaderElection的recvqueue隊(duì)列
            public void run() {

                Message response;
                while (!stop) {
                    // Sleeps on receive
                    try {
                        //5--QuorumCnxManager中recvQueue隊(duì)列的消息
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        if(response == null) continue;

                        // The current protocol and two previous generations all send at least 28 bytes
                        if (response.buffer.capacity() < 28) {
                            LOG.error("Got a short response: " + response.buffer.capacity());
                            continue;
                        }

                        // this is the backwardCompatibility mode in place before ZK-107
                        // It is for a version of the protocol in which we didn't send peer epoch
                        // With peer epoch and version the message became 40 bytes
                        boolean backCompatibility28 = (response.buffer.capacity() == 28);

                        // this is the backwardCompatibility mode for no version information
                        boolean backCompatibility40 = (response.buffer.capacity() == 40);
                        
                        response.buffer.clear();

                        // Instantiate Notification and set its attributes
                        Notification n = new Notification();

                        int rstate = response.buffer.getInt();
                        long rleader = response.buffer.getLong();
                        long rzxid = response.buffer.getLong();
                        long relectionEpoch = response.buffer.getLong();
                        long rpeerepoch;

                        int version = 0x0;
                        if (!backCompatibility28) {
                            rpeerepoch = response.buffer.getLong();
                            if (!backCompatibility40) {
                                /*
                                 * Version added in 3.4.6
                                 */
                                
                                version = response.buffer.getInt();
                            } else {
                                LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
                            }
                        } else {
                            LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
                            rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
                        }

                        QuorumVerifier rqv = null;

                        // check if we have a version that includes config. If so extract config info from message.
                        if (version > 0x1) {
                            int configLength = response.buffer.getInt();
                            byte b[] = new byte[configLength];

                            response.buffer.get(b);
                                                       
                            synchronized(self) {
                                try {
                                    rqv = self.configFromString(new String(b));
                                    QuorumVerifier curQV = self.getQuorumVerifier();
                                    if (rqv.getVersion() > curQV.getVersion()) {
                                        LOG.info("{} Received version: {} my version: {}", self.getId(),
                                                Long.toHexString(rqv.getVersion()),
                                                Long.toHexString(self.getQuorumVerifier().getVersion()));
                                        if (self.getPeerState() == ServerState.LOOKING) {
                                            LOG.debug("Invoking processReconfig(), state: {}", self.getServerState());
                                            self.processReconfig(rqv, null, null, false);
                                            if (!rqv.equals(curQV)) {
                                                LOG.info("restarting leader election");
                                                self.shuttingDownLE = true;
                                                self.getElectionAlg().shutdown();

                                                break;
                                            }
                                        } else {
                                            LOG.debug("Skip processReconfig(), state: {}", self.getServerState());
                                        }
                                    }
                                } catch (IOException e) {
                                    LOG.error("Something went wrong while processing config received from {}", response.sid);
                               } catch (ConfigException e) {
                                   LOG.error("Something went wrong while processing config received from {}", response.sid);
                               }
                            }                          
                        } else {
                            LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
                        }
                       
                        /*
                         * If it is from a non-voting server (such as an observer or
                         * a non-voting follower), respond right away.
                         */
                        if(!validVoter(response.sid)) {
                            Vote current = self.getCurrentVote();
                            QuorumVerifier qv = self.getQuorumVerifier();
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                    current.getId(),
                                    current.getZxid(),
                                    logicalclock.get(),
                                    self.getPeerState(),
                                    response.sid,
                                    current.getPeerEpoch(),
                                    qv.toString().getBytes());

                            sendqueue.offer(notmsg);
                        } else {
                            // Receive new message
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Receive new notification message. My id = "
                                        + self.getId());
                            }

                            // State of peer that sent this message
                            QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
                            switch (rstate) {
                            case 0:
                                ackstate = QuorumPeer.ServerState.LOOKING;
                                break;
                            case 1:
                                ackstate = QuorumPeer.ServerState.FOLLOWING;
                                break;
                            case 2:
                                ackstate = QuorumPeer.ServerState.LEADING;
                                break;
                            case 3:
                                ackstate = QuorumPeer.ServerState.OBSERVING;
                                break;
                            default:
                                continue;
                            }

                            n.leader = rleader;
                            n.zxid = rzxid;
                            n.electionEpoch = relectionEpoch;
                            n.state = ackstate;
                            n.sid = response.sid;
                            n.peerEpoch = rpeerepoch;
                            n.version = version;
                            n.qv = rqv;
                            /*
                             * Print notification info
                             */
                            if(LOG.isInfoEnabled()){
                                printNotification(n);
                            }

                            /*
                             * If this server is looking, then send proposed leader
                             */

                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                                //5---放入到recvqueue隊(duì)列中,待處理
                                recvqueue.offer(n);

                                /*
                                 * Send a notification back if the peer that sent this
                                 * message is also looking and its logical clock is
                                 * lagging behind.
                                 */
                                //--對(duì)方也是Looking撇眯,判斷那方的Epoch和zxid大报嵌,大的成為新leader候選
                                if((ackstate == QuorumPeer.ServerState.LOOKING)
                                        && (n.electionEpoch < logicalclock.get())){
                                    Vote v = getVote();
                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                                            v.getId(),
                                            v.getZxid(),
                                            logicalclock.get(),
                                            self.getPeerState(),
                                            response.sid,
                                            v.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    sendqueue.offer(notmsg);
                                }
                            } else {
                                /*
                                 * If this server is not looking, but the one that sent the ack
                                 * is looking, then send back what it believes to be the leader.
                                 */
                                //--當(dāng)前server不是looking,但是對(duì)方是looking熊榛,就發(fā)給自己認(rèn)為是leader的消息過去
                                Vote current = self.getCurrentVote();
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
                                    if(LOG.isDebugEnabled()){
                                        LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
                                                self.getId(),
                                                response.sid,
                                                Long.toHexString(current.getZxid()),
                                                current.getId(),
                                                Long.toHexString(self.getQuorumVerifier().getVersion()));
                                    }

                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(
                                            ToSend.mType.notification,
                                            current.getId(),
                                            current.getZxid(),
                                            current.getElectionEpoch(),
                                            self.getPeerState(),
                                            response.sid,
                                            current.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    //等會(huì)發(fā)給 發(fā)送消息隊(duì)列的(queueSendMap)  消息隊(duì)列
                                    sendqueue.offer(notmsg);
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted Exception while waiting for new message" +
                                e.toString());
                    }
                }
                LOG.info("WorkerReceiver is down");
            }

把關(guān)注點(diǎn)放在這段代碼上:


                            /*
                             * If this server is looking, then send proposed leader
                             */

                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                                //5---放入到recvqueue隊(duì)列中锚国,待處理
                                recvqueue.offer(n);

                                /*
                                 * Send a notification back if the peer that sent this
                                 * message is also looking and its logical clock is
                                 * lagging behind.
                                 */
                                //--對(duì)方也是Looking,判斷那方的Epoch和zxid大玄坦,大的成為新leader候選
                                if((ackstate == QuorumPeer.ServerState.LOOKING)
                                        && (n.electionEpoch < logicalclock.get())){
                                    Vote v = getVote();
                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                                            v.getId(),
                                            v.getZxid(),
                                            logicalclock.get(),
                                            self.getPeerState(),
                                            response.sid,
                                            v.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    sendqueue.offer(notmsg);
                                }
                            } else {
                                /*
                                 * If this server is not looking, but the one that sent the ack
                                 * is looking, then send back what it believes to be the leader.
                                 */
                                //--當(dāng)前server不是looking血筑,但是對(duì)方是looking,就發(fā)給自己認(rèn)為是leader的消息過去
                                Vote current = self.getCurrentVote();
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
                                    if(LOG.isDebugEnabled()){
                                        LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
                                                self.getId(),
                                                response.sid,
                                                Long.toHexString(current.getZxid()),
                                                current.getId(),
                                                Long.toHexString(self.getQuorumVerifier().getVersion()));
                                    }

                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(
                                            ToSend.mType.notification,
                                            current.getId(),
                                            current.getZxid(),
                                            current.getElectionEpoch(),
                                            self.getPeerState(),
                                            response.sid,
                                            current.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    //等會(huì)發(fā)給 發(fā)送消息隊(duì)列的(queueSendMap)  消息隊(duì)列
                                    sendqueue.offer(notmsg);
                                }
                            }

選舉邏輯

首先有這樣一個(gè)選舉類:

public class Vote {
    //
    final private int version;
    //被選舉leader的服務(wù)器ID
    final private long id;
    //被選舉leader的事務(wù)ID
    final private long zxid;
    //邏輯時(shí)鐘煎楣,判斷多個(gè)選票是否處于同一個(gè)選舉周期豺总,
    final private long electionEpoch;
    //被推舉leader的選舉輪次
    final private long peerEpoch;
    //狀態(tài)
    final private ServerState state;
}

核心算法:FastLeaderElection#lookForLeader(可以啟動(dòng)只讀模式和阻塞模式)

    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(
                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }
        if (self.start_fle == 0) {
           self.start_fle = Time.currentElapsedTime();
        }
        try {
            //--儲(chǔ)存收到的Notication
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
                //5--更新選舉周期
                logicalclock.incrementAndGet();
                //5--把自己作為leader作為投票發(fā)給其它,這個(gè)時(shí)候并未發(fā)送出去,下面才是廣播發(fā)送
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            //--放到sendqueue里面择懂,等待發(fā)送給其他人
            //--初始化投票
            sendNotifications();

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                //5--recvqueue數(shù)據(jù)來自Messenger喻喳,也可能來自后面候選人失敗了再放進(jìn)去的消息
                //5--notTimeout超時(shí)時(shí)間
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
                    //5--檢查網(wǎng)絡(luò)發(fā)送隊(duì)列queueSendMap是否為空,再次發(fā)送
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                        //5--重連
                        //5--queueSendMap的key是每臺(tái)機(jī)器的sid
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    //5--如果超時(shí)沒有獲取到選票vote則采用退避算法困曙,下次使用更長(zhǎng)的超時(shí)時(shí)間
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                } //5--?????
                else if (validVoter(n.sid) && validVoter(n.leader)) {
                    //5--這里去看對(duì)面是什么狀態(tài)
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    switch (n.state) {
                    case LOOKING:
                        //5--選取electionEpoch較大的--選舉輪次
                        //如果electionEpoch相等則取zxid較大的
                        //如果zxid相等則取myid較大的
                        // If notification > current, replace and send messages out
                        //5--對(duì)方投票周期大于自己IDE
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            //5--投票集合清空
                            recvset.clear();
                            //5--比較myid表伦,zxid,electionEpoch
                            //5--1:electionEpoch大慷丽,2:zxid大蹦哼,3:myid大(leader編號(hào))
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //5--更新大的一方的myid,zxid要糊,electionEpoch
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            //5--告訴其他人
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {//5--忽略對(duì)方投票
                            if(LOG.isDebugEnabled()){
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {//5--周期相同翔怎,跟第一個(gè)條件一樣的比較
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            //5--告訴其他人
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        // don't care about the version if it's in LOOKING state
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        //5--判斷當(dāng)前候選人proposedLeader,proposedZxid杨耙,proposedEpoch在選票中是否占了大多數(shù)?飘痛?珊膜?不清楚怎么判斷的--
                        //--QuorumHierarchical.containsQuorum()或者QuorumMaj.containsQuorum()
                        //5--嘗試通過現(xiàn)在已經(jīng)收到的信息,判斷是否已經(jīng)足夠確認(rèn)最終的leader了宣脉,通過方法termPredicate() 车柠,判斷標(biāo)準(zhǔn)很簡(jiǎn)單:是否已經(jīng)有超過半數(shù)的機(jī)
                        // 器所推舉的leader為當(dāng)前自己所推舉的leader.如果是,保險(xiǎn)起見,最多再等待finalizeWait(默認(rèn)200ms)的時(shí)間進(jìn)行最后的確認(rèn)竹祷,如果發(fā)現(xiàn)有
                        // 了更新的leader信息谈跛,則把這個(gè)Notification重新放回recvqueue,顯然,選舉將繼續(xù)進(jìn)行塑陵。否則感憾,選舉結(jié)束,根據(jù)選舉的leader是否是自己令花,設(shè)
                        // 置自己的狀態(tài)為L(zhǎng)EADING或者OBSERVING或者FOLLOWING阻桅。
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            //--看是否已選定的候選人被修改
                            //--注意這里有個(gè)finalizeWait延時(shí)獲取
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    //--如果被修改,再次放到recvqueue再次循環(huán)
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            //5--隊(duì)列中所有的投票都已處理完兼都,則選舉出Leader嫂沉,并判斷是否屬于自己
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(), 
                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        //5--一個(gè)時(shí)鐘周期
                        if(n.electionEpoch == logicalclock.get()){
                            //5--存到recvset
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            //5--比較對(duì)方和自己選舉的的leader,占據(jù)大多數(shù)
                            if(termPredicate(recvset, new Vote(n.version, n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                Vote endVote = new Vote(n.leader, 
                                        n.zxid, n.electionEpoch, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         */
                        //--不在一個(gè)時(shí)鐘扮碧,說明自己掛了又起起來了趟章,把被人的投票放到outofelection,
                        //--對(duì)方的投票在outofelection占據(jù)大多數(shù)并且承認(rèn)自己愿意做leader
                        outofelection.put(n.sid, new Vote(n.version, n.leader, 
                                n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        //--占據(jù)大多數(shù)
                        if (termPredicate(outofelection, new Vote(n.version, n.leader,
                                n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                //--承認(rèn)自己是leader
                                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized(this){
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, 
                                    n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: " + n.state
                              + " (n.state), " + n.sid + " (n.sid)");
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }

lookForLeader的邏輯便是:
1:更新選舉周期慎王,把自己作為leader作為投票發(fā)給其它蚓土。
2:進(jìn)入本輪投票循環(huán),直到不是looking狀態(tài)柬祠。

在接收到投票后判斷對(duì)方狀態(tài):

  • LOOKING:比較兩者的投票信息北戏,比較的順序是Epoch、zxid漫蛔、Id嗜愈,優(yōu)先選投票輪次高的,投票輪次相同選Zxid高的莽龟,Zxid相同選id高的蠕嫁,并且確定結(jié)果后還要告訴其他人自己的選舉結(jié)果,同時(shí)判斷判斷當(dāng)前候選人proposedLeader毯盈,proposedZxid剃毒,proposedEpoch在選票中是否占了大多數(shù),這個(gè)是在FastLeaderElection#termPredicate實(shí)現(xiàn)搂赋,具體實(shí)現(xiàn)類有兩個(gè)赘阀, 分別是QuorumMaj.containsQuorum()或者QuorumHierarchical.containsQuorum(),做完這一步后脑奠,還要判斷一下是否有人修改過leader基公,如果被修改,再次放到recvqueue再次循環(huán)
  • OBSERVING:對(duì)方是一個(gè)OBSERVING狀態(tài)宋欺,直接無視它轰豆。
  • FOLLOWING和LEADING:如果對(duì)方和自己是在一個(gè)時(shí)鐘內(nèi)胰伍,就說明對(duì)方已經(jīng)完成了選舉,如果對(duì)方說它就是leader酸休,我們承認(rèn)就好骂租,否則做一個(gè)大多數(shù)判斷。都通過了的話就把該leader作為自己的leader斑司。如果對(duì)方和自己不再一個(gè)時(shí)鐘渗饮,說明自己掛了又起起來了,把被人的投票放到outofelection陡厘,如果對(duì)方的投票在outofelection占據(jù)大多數(shù)并且承認(rèn)自己愿意做leader抽米,這時(shí)候更新選舉周期,修改自己的狀態(tài)為FOLLOWING或者LEADING

以上就是本文全部?jī)?nèi)容糙置,前面部分一定要跟著兩張圖來看云茸,不然很容易暈,后面的選舉邏輯看文字解析還算比較容易懂谤饭。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末标捺,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子揉抵,更是在濱河造成了極大的恐慌亡容,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件冤今,死亡現(xiàn)場(chǎng)離奇詭異闺兢,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)戏罢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門屋谭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人龟糕,你說我怎么就攤上這事桐磁。” “怎么了讲岁?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵我擂,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我缓艳,道長(zhǎng)校摩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任阶淘,我火速辦了婚禮秧耗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘舶治。我一直安慰自己分井,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布霉猛。 她就那樣靜靜地躺著尺锚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪惜浅。 梳的紋絲不亂的頭發(fā)上瘫辩,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音坛悉,去河邊找鬼伐厌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛裸影,可吹牛的內(nèi)容都是我干的挣轨。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼轩猩,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼卷扮!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起均践,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤晤锹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后彤委,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鞭铆,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年焦影,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了车遂。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡偷办,死狀恐怖艰额,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情椒涯,我是刑警寧澤柄沮,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站废岂,受9級(jí)特大地震影響祖搓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜湖苞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一拯欧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧财骨,春花似錦镐作、人聲如沸藏姐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽羔杨。三九已至,卻和暖如春杨蛋,著一層夾襖步出監(jiān)牢的瞬間兜材,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來泰國打工逞力, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留曙寡,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓寇荧,卻偏偏與公主長(zhǎng)得像举庶,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子砚亭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 品味Zookeeper之選舉及數(shù)據(jù)一致性 本文思維導(dǎo)圖 前言 為了高可用和數(shù)據(jù)安全起見灯变,zk集群一般都是由幾個(gè)節(jié)點(diǎn)...
    dandan的微笑閱讀 5,048評(píng)論 5 12
  • 摘要 這一節(jié)講解leader選舉算法源碼分下,主要講解 內(nèi)容較多捅膘,走馬觀花的話添祸,可以直接看圖,或者直接看選舉相關(guān)函...
    赤子心_d709閱讀 2,201評(píng)論 2 3
  • 初始化 在集群模式節(jié)點(diǎn)啟動(dòng)時(shí)寻仗,調(diào)用QuorumPeer#createElectionAlgorithm方法刃泌,創(chuàng)建選...
    Kohler閱讀 774評(píng)論 0 0
  • 若進(jìn)行Leader選舉,則至少需要兩臺(tái)機(jī)器署尤,這里選取3臺(tái)機(jī)器組成的服務(wù)器集群為例耙替。在集群初始化階段,當(dāng)有一臺(tái)服務(wù)器...
    白紙糊閱讀 977評(píng)論 0 1
  • CAP定理 C(一致性):所有的節(jié)點(diǎn)上的數(shù)據(jù)時(shí)刻保持同步 A(可用性):每個(gè)請(qǐng)求都能接受到一個(gè)響應(yīng)曹体,無論響應(yīng)成功或...
    程序男保姆閱讀 140評(píng)論 0 0