Zookeeper Leader和Follower初始化

說在前面

zookeeper ZAB Leader Elect 源碼分析 尽超,已經(jīng)詳細(xì)的分析了zookeeper的選主過程磅叛,接下來的文章會(huì)分析Leader和Follower的初始化過程

初始化示例圖

當(dāng)leader被選出來之后嗅定,leader和follower進(jìn)入集群形成和數(shù)據(jù)同步狀態(tài)赂弓,包含以下幾個(gè)過程

1. 連接連接

leader會(huì)根據(jù)zoo.cfg里面配置ip的第一個(gè)端口啟動(dòng)連接監(jiān)聽器LearnerCnxAcceptorHandler來監(jiān)控follower的連接請(qǐng)求


connect_init.png
新epoch的確定

新形成的集群需要一個(gè)新的epoch來表示大家目前是不是工作共一個(gè)circle中


new_epoch.png
數(shù)同步過程

當(dāng)集群新的epoch確定之后到腥,集群就開始進(jìn)行數(shù)據(jù)恢復(fù)梧田,數(shù)據(jù)恢復(fù)完成之后雁比,follower和leader的數(shù)據(jù)處理引擎啟動(dòng)稚虎,之后集群就可以向外提供服務(wù)了


data_sync.png

下面進(jìn)行源碼的詳解

Leader

經(jīng)過幾輪投票之后,Leader被成功的選了出來偎捎,Leading 所對(duì)應(yīng)的QuorumPeer類就會(huì)進(jìn)入Leading過程


while (running) {
                switch (getPeerState()) {
                   case LOOKING:  .....
                   case FOLLOWING: .....
                   case OBSERVING: ......
                   case LEADING:
                       LOG.info("LEADING");
                      try {
                        //生成Leader對(duì)象
                        setLeader(makeLeader(logFactory));
                       //作為集群老大開始領(lǐng)導(dǎo)大家
                        leader.lead();
                        setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            updateServerState();
                        }
                        break;
                }

               }

       }

QuorumPeer 創(chuàng)建Leader對(duì)象

Leader對(duì)象繼承自LearnerMaster蠢终,Leader的屬性太多,我們就不在這里介紹了茴她,在后面如果遇到對(duì)應(yīng)的屬性我們?cè)僮鲈敿?xì)解析

new Leader
// LeaderZooKeeperServer 繼承ZookeeperServer寻拂,它代表的是節(jié)點(diǎn)在Leader角色下的zookeeper服務(wù)
public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        this.proposalStats = new BufferStats();
        //獲得Leader節(jié)點(diǎn)的監(jiān)聽ip地址端口
        Set<InetSocketAddress> addresses;
        if (self.getQuorumListenOnAllIPs()) {
            addresses = self.getQuorumAddress().getWildcardAddresses();
        } else {
            addresses = self.getQuorumAddress().getAllAddresses();
        }

        addresses.stream()
          .map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum()))
          .filter(Optional::isPresent)
          .map(Optional::get)
          .forEach(serverSockets::add);

        if (serverSockets.isEmpty()) {
            throw new IOException("Leader failed to initialize any of the following sockets: " + addresses);
        }

        this.zk = zk;
    }
new LeaderZooKeeperServer

LeaderZooKeeperServer的初始化過程最后會(huì)初始化ZookeeperServer,下面的代碼我在zookeeper單機(jī)版server端啟動(dòng)源碼分析中有解析過丈牢,這里不在重復(fù)

 public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.txnLogFactory.setServerStats(this.serverStats);
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        setMinSessionTimeout(minSessionTimeout);
        setMaxSessionTimeout(maxSessionTimeout);
        this.listenBacklog = clientPortListenBacklog;
        this.reconfigEnabled = reconfigEnabled;

        listener = new ZooKeeperServerListenerImpl(this);

        readResponseCache = new ResponseCache(Integer.getInteger(
            GET_DATA_RESPONSE_CACHE_SIZE,
            ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));

        getChildrenResponseCache = new ResponseCache(Integer.getInteger(
            GET_CHILDREN_RESPONSE_CACHE_SIZE,
            ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));

        this.initialConfig = initialConfig;

        this.requestPathMetricsCollector = new RequestPathMetricsCollector();

        this.initLargeRequestThrottlingSettings();

        LOG.info(
            "Created server with"
                + " tickTime {}"
                + " minSessionTimeout {}"
                + " maxSessionTimeout {}"
                + " clientPortListenBacklog {}"
                + " datadir {}"
                + " snapdir {}",
            tickTime,
            getMinSessionTimeout(),
            getMaxSessionTimeout(),
            getClientPortListenBacklog(),
            txnLogFactory.getDataDir(),
            txnLogFactory.getSnapDir());
    }

后面我們解析Leader數(shù)據(jù)處理鏈的時(shí)候還會(huì)解析祭钉,LeaderZooKeeperServer現(xiàn)在我們暫且不表。

Leader.lead

Leader創(chuàng)建完之后通過lead方法讓自己進(jìn)入Lead的過程己沛,我們看下lead()方法的源代碼,這個(gè)方法比較長(zhǎng)我們一段一段的來看慌核,先看第一段

            //設(shè)置zab的狀態(tài)為discovery
           self.setZabState(QuorumPeer.ZabState.DISCOVERY);
            self.tick.set(0);
           //zookeeperServer去加載本地?cái)?shù)據(jù),正常的情況下由于在選主的時(shí)候zk本地的數(shù)據(jù)已經(jīng)加載完成了申尼,這里的loadData只是會(huì)做一個(gè)本地鏡像
            zk.loadData();
            
            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // Start thread that waits for connection requests from
            // new followers.
           //創(chuàng)建leader端的連接監(jiān)聽器線程管理類垮卓,用來創(chuàng)建等待follower的連接的LearnerCnxAcceptorHandler連接監(jiān)聽器
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();
            //lead方法會(huì)在這個(gè)地方wait,等待有過半數(shù)follower到來师幕,然后生成新的epoch
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

LearnerCnxAcceptor

Leader在lead過程中會(huì)創(chuàng)建LearnerCnxAcceptor我們看下LearnerCnxAcceptor的run實(shí)現(xiàn)

 public void run() {
            if (!stop.get() && !serverSockets.isEmpty()) {
                ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());
                CountDownLatch latch = new CountDownLatch(serverSockets.size());
                 //根據(jù)leader綁定的ip來創(chuàng)建LearnerCnxAcceptorHandler
                serverSockets.forEach(serverSocket ->
                        executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));

                try {
                    latch.await();
                } catch (InterruptedException ie) {
                    LOG.error("Interrupted while sleeping in LearnerCnxAcceptor.", ie);
                } finally {
                    closeSockets();
                    executor.shutdown();
                    try {
                        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                            LOG.error("not all the LearnerCnxAcceptorHandler terminated properly");
                        }
                    } catch (InterruptedException ie) {
                        LOG.error("Interrupted while terminating LearnerCnxAcceptor.", ie);
                    }
                }
            }
        }

LearnerCnxAcceptorHandler

Leader端用來接收follower連接請(qǐng)求的線程
我看下LearnerCnxAcceptorHandler的run方法

public void run() {
                try {
                    Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress());

                    while (!stop.get()) {
                       //acceptConnections實(shí)現(xiàn)了Leader ServerSocket監(jiān)聽follower的連接請(qǐng)求
                        acceptConnections();
                    }
                } catch (Exception e) {
                    LOG.warn("Exception while accepting follower", e);
                    if (fail.compareAndSet(false, true)) {
                        handleException(getName(), e);
                        halt();
                    }
                } finally {
                    latch.countDown();
                }
            }
LearnerCnxAcceptorHandler.acceptConnections
  private void acceptConnections() throws IOException {
                Socket socket = null;
                boolean error = false;
                try {
                  //接收follower的連接請(qǐng)求
                    socket = serverSocket.accept();

                    // start with the initLimit, once the ack is processed
                    // in LearnerHandler switch to the syncLimit
                    //設(shè)置socket的超時(shí)時(shí)間
                    socket.setSoTimeout(self.tickTime * self.initLimit);
                    socket.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
                    //創(chuàng)建LearnerHandler線程來表示和處理follower發(fā)送來的請(qǐng)求
                    LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    error = true;
                    if (stop.get()) {
                        LOG.warn("Exception while shutting down acceptor.", e);
                    } else {
                        throw e;
                    }
                } catch (SaslException e) {
                    LOG.error("Exception while connecting to quorum learner", e);
                    error = true;
                } catch (Exception e) {
                    error = true;
                    throw e;
                } finally {
                    // Don't leak sockets on errors
                    if (error && socket != null && !socket.isClosed()) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            LOG.warn("Error closing socket: " + socket, e);
                        }
                    }
                }
            }
LearnerHandler 是follower在Leader端的表示

我們暫時(shí)先不做詳細(xì)介紹粟按,等下面講Follower的時(shí)候我們?cè)俳馕?/p>

Follower

上面講解了master初始化的一部分,為什么不繼續(xù)講解呢霹粥,因?yàn)檫@個(gè)時(shí)候只有結(jié)合follower端的動(dòng)作才能更好的理解灭将,下面我們進(jìn)入follower初始化過程的解析。
在master被選出來之后非master節(jié)點(diǎn)會(huì)把自己設(shè)置成follower節(jié)點(diǎn)(這里我們不講observer)然后進(jìn)入followLeader階段,下面是follower節(jié)點(diǎn)QuorumPeer following代碼片段


                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        updateServerState();
                    }
                    break;

創(chuàng)建Follower

創(chuàng)建follower首先會(huì)創(chuàng)建FollowerZooKeeperServer后控,F(xiàn)ollowerZooKeeperServer繼承自ZooKeeperServer宗侦,代表了follower節(jié)點(diǎn)的zookeeper實(shí)例,但是它有自己獨(dú)特的請(qǐng)求處理鏈在后續(xù)講解請(qǐng)求處理的時(shí)候忆蚀,我會(huì)詳細(xì)解析

       new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));

follower.followLeader

Follower實(shí)例被創(chuàng)建起來之后通followLeader來與Leader進(jìn)行交互

 void followLeader() throws InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
        LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;
        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);

        long connectionTime = 0;
        boolean completedSync = false;

        try {
            //把zab的狀態(tài)由election改變成discovery
            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
           //根據(jù)投票的結(jié)果找到Leader節(jié)點(diǎn)
            QuorumServer leaderServer = findLeader();
            try {
                 //建立到Leader節(jié)點(diǎn)的socket連接
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                connectionTime = System.currentTimeMillis();
                 //向Leader節(jié)點(diǎn)注冊(cè)自己矾利,獲取到新的epoch姑裂,至此,follower已經(jīng)成為zookeeper集群中一個(gè)合法的follower節(jié)點(diǎn)了
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange()) {
                    throw new Exception("learned about role change");
                }
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch "
                              + ZxidUtils.zxidToString(newEpochZxid)
                              + " is less than our accepted epoch "
                              + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                long startTime = Time.currentElapsedTime();
                try {
                     //follower設(shè)置leader的adress和sid
                    self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
                  // follower更改zab狀態(tài)為synchronization,進(jìn)入數(shù)據(jù)同步狀態(tài)
                    self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                   //與leader同步數(shù)據(jù)
                    syncWithLeader(newEpochZxid);
                //同步完數(shù)據(jù)之后男旗,follower修改zab狀態(tài)為broadcast舶斧,zab協(xié)議的最后一個(gè)過程完成,節(jié)點(diǎn)可以向外提供服務(wù)了
                    self.setZabState(QuorumPeer.ZabState.BROADCAST);
                    completedSync = true;
                } finally {
                    long syncTime = Time.currentElapsedTime() - startTime;
                    ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
                }
                if (self.getObserverMasterPort() > 0) {
                    LOG.info("Starting ObserverMaster");

                    om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                    om.start();
                } else {
                    om = null;
                }
                // create a reusable packet to reduce gc impact todo: at this point follower is ready to serve
                QuorumPacket qp = new QuorumPacket();
                //follower接下來會(huì)進(jìn)入如下的循環(huán)體中察皇,接受來自leader的消息然后處理
                while (this.isRunning()) {
                     //接受來自leader的消息
                    readPacket(qp);
                   //處理消息:關(guān)于消息的處理茴厉,我們放在后面解析
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                closeSocket();

                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            if (om != null) {
                om.stop();
            }
            zk.unregisterJMX(this);

            if (connectionTime != 0) {
                long connectionDuration = System.currentTimeMillis() - connectionTime;
                LOG.info(
                    "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
                    leaderAddr,
                    connectionDuration,
                    completedSync);
                messageTracker.dumpToLog(leaderAddr.toString());
            }
        }
    }
connectToLeader

Follower連接Leader,我們直接看實(shí)現(xiàn)


 protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {

        this.leaderAddr = multiAddr;
        Set<InetSocketAddress> addresses;
        if (self.isMultiAddressReachabilityCheckEnabled()) {
            // even if none of the addresses are reachable, we want to try to establish connection
            // see ZOOKEEPER-3758
            addresses = multiAddr.getAllReachableAddressesOrAll();
        } else {
            addresses = multiAddr.getAllAddresses();
        }
        ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
        CountDownLatch latch = new CountDownLatch(addresses.size());
        AtomicReference<Socket> socket = new AtomicReference<>(null);
       //根據(jù)leader的地址創(chuàng)建LeaderConnector什荣,LeaderConnector是真正建立到Leader連接的線程類
        addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);

        try {
            latch.await();
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while trying to connect to Leader", e);
        } finally {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    LOG.error("not all the LeaderConnector terminated properly");
                }
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
            }
        }

        if (socket.get() == null) {
            throw new IOException("Failed connect to " + multiAddr);
        } else {
            sock = socket.get();
        }

        self.authLearner.authenticate(sock, hostname);
        //通過LeaderConnector我們創(chuàng)建了到Leader的socket連接矾缓,
        // leaderIs封裝了到leader socket的輸入流
        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        // leaderOs封裝了到leader socket的輸出流
        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    }
LeaderConnector

LeaderConnector的作用就是follower建立到Leader的socket連接渐扮,它是一個(gè)線程類囱淋,當(dāng)?shù)降絃eader的socket建立完成之后這個(gè)類的使命也就完成了,其線程也就結(jié)束了,這個(gè)類的實(shí)現(xiàn)沒有什么好說了的典勇,在這里就不解析了桅锄。

Follower.registerWithLeader

Follower向Leader注冊(cè)自己的信息

 protected long registerWithLeader(int pktType) throws IOException {
        /*
         * Send follower info, including last zxid and sid
         */
        //follower從db中獲取已經(jīng)處理最大的事物id
        long lastLoggedZxid = self.getLastLoggedZxid();
         //QuorumPacket是Leader的Follower之間消息序列化載體
        QuorumPacket qp = new QuorumPacket();
        qp.setType(pktType);
        qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

        /*
         * Add sid to payload
         */
        //LearningInfo表示的是本follower
        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
         //把follower的信息(sid琉雳,協(xié)議版本號(hào),verify的版本號(hào))序列化到ByteArrayOutputStream中友瘤,然后寫入QuorumPacket的data中
        boa.writeRecord(li, "LearnerInfo");
        qp.setData(bsid.toByteArray());
        //把QuorumPacket通過socket發(fā)送給服務(wù)端
        writePacket(qp, true);
        //等待服務(wù)端的返回,leader會(huì)返回新的epoch給到客戶端
        readPacket(qp);
        //從leader的返回中獲得新的epoch
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        if (qp.getType() == Leader.LEADERINFO) {
            // we are connected to a 1.0 server so accept the new epoch and read the next packet
            leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
            byte[] epochBytes = new byte[4];
            final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
            if (newEpoch > self.getAcceptedEpoch()) {
                //把新的epoch寫入到本機(jī)的acceptedEpoch文件中翠肘,表示follower在當(dāng)前新circle中了
                wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
                self.setAcceptedEpoch(newEpoch);
            } else if (newEpoch == self.getAcceptedEpoch()) {
                  //如果新的epoch和acceptedEpoch相同,那么做不任何處理
                // since we have already acked an epoch equal to the leaders, we cannot ack
                // again, but we still need to send our lastZxid to the leader so that we can
                // sync with it if it does assume leadership of the epoch.
                // the -1 indicates that this reply should not count as an ack for the new epoch
                wrappedEpochBytes.putInt(-1);
            } else {
                //如果follower的acceptedEpoch大于新的epoch那么表示本機(jī)所在circle大于新集群設(shè)定的circle辫秧,直接報(bào)錯(cuò)
                throw new IOException("Leaders epoch, "
                                      + newEpoch
                                      + " is less than accepted epoch, "
                                      + self.getAcceptedEpoch());
            }
            //向leader發(fā)送對(duì)新的epoch的ack(包括自己的事物id束倍,自己當(dāng)前的epoch)
            QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
            writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
        } else {
            if (newEpoch > self.getAcceptedEpoch()) {
                self.setAcceptedEpoch(newEpoch);
            }
            if (qp.getType() != Leader.NEWLEADER) {
                LOG.error("First packet should have been NEWLEADER");
                throw new IOException("First packet should have been NEWLEADER");
            }
            return qp.getZxid();
        }
    }

到這里我停一下,follower既然向leader發(fā)送了消息盟戏,我們看下leader端是如何處理的


上面我提到服務(wù)端創(chuàng)建線程類LearnerHandler來處理follower的請(qǐng)求绪妹,我們現(xiàn)在看下LearnerHandler的run方法

LearnerHandler.run

LearnerHandler處理和follower之間的所有通信數(shù)據(jù),代碼很長(zhǎng)抓半,下面我分段進(jìn)行講解喂急,我先看第一段格嘁,接受follower發(fā)送來的epoch

 public void run() {
        try {
            //把代表follower的LearnerHandler加入到learnerMaster的follower列表中
            learnerMaster.addLearnerHandler(this);
            tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();
           //初始化輸入流和輸出流
            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            //通過輸入流讀取來自follower發(fā)送的來的一個(gè)epoch信息
            ia.readRecord(qp, "packet");

            messageTracker.trackReceived(qp.getType());
            if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
                LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());

                return;
            }

            if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
                throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
            }
            //獲取消息體
            byte[] learnerInfoData = qp.getData();
            if (learnerInfoData != null) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    //從消息體中獲得follower的sid
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                     //獲取protocolVersion
                    this.version = bbsid.getInt(); // protocolVersion
                }
                if (learnerInfoData.length >= 20) {
                     //獲取configVersion 
                    long configVersion = bbsid.getLong();
                    if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
                        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                    }
                }
            } else {
                this.sid = learnerMaster.getAndDecrementFollowerCounter();
            }
           //根據(jù)sid獲取follower配置的ip和端口信息
            String followerInfo = learnerMaster.getPeerInfo(this.sid);
            if (followerInfo.isEmpty()) {
                LOG.info(
                    "Follower sid: {} not in the current config {}",
                    this.sid,
                    Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
            } else {
                LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
            }

            if (qp.getType() == Leader.OBSERVERINFO) {
                learnerType = LearnerType.OBSERVER;
            }

            learnerMaster.registerLearnerHandlerBean(this, sock);
              //getEpochFromZxid 獲取客戶端發(fā)送的epoch
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
       long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();
           // learnerMaster.getEpochToPropose:leader QuorumPeer lead在getEpochToPropose方法上會(huì)等待過半數(shù)的以上的participant到來笛求,才會(huì)進(jìn)行執(zhí)行l(wèi)ead剩余的代碼
          //每一個(gè)follower到來,只要他們?cè)谕粋€(gè)輪次糕簿,這個(gè)follower就會(huì)更改leader中participant個(gè)數(shù)的狀態(tài)探入,如果follower加上leader的數(shù)量過半了,那么leader的QuorumPeer線程就會(huì)跳出等待懂诗,繼續(xù)執(zhí)行蜂嗽,同時(shí)返回新的epoch,表示集群進(jìn)入新的工作輪次
            long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);

第二段 Leader發(fā)送新生成的epoch給到follower

           //根據(jù)新生成的epoch生成leader新zxid
          long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                learnerMaster.waitForEpochAck(this.getSid(), ss);
            } else {
                byte[] ver = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                 //創(chuàng)建消息發(fā)送體 發(fā)送新的leader zxid給follower
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
               //通過把socket把消息發(fā)送出去
                oa.writeRecord(newEpochPacket, "packet");
                messageTracker.trackSent(Leader.LEADERINFO);
                bufferedOutput.flush();
               //讀取follower對(duì)leader新生成的epoch的ack
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                messageTracker.trackReceived(ackEpochPacket.getType());
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
                    return;
                }

這個(gè)時(shí)候我再回到Leader的lead方法殃恒,當(dāng)主線程從getEpochToPropose返回后發(fā)生了什么

            //獲得新生成的epoch
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            
             //epoch生成zxid
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

            synchronized (this) {
                lastProposed = zk.getZxid();
            }
             //生成宣稱自己是leader的報(bào)文
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);

            if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
            }

            QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
            QuorumVerifier curQV = self.getQuorumVerifier();
            if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
               
                try {
                    LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
                    QuorumVerifier newQV = self.configFromString(curQV.toString());
                    newQV.setVersion(zk.getZxid());
                    self.setLastSeenQuorumVerifier(newQV, true);
                } catch (Exception e) {
                    throw new IOException(e);
                }
            }

            newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
            if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
                newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }

           //等待過半數(shù)的participant對(duì)新epoch的ack信息
            waitForEpochAck(self.getId(), leaderStateSummary);

這個(gè)時(shí)候leader在等待有過半數(shù)的participant對(duì)新epoch的ack

我們?cè)诨氐絃earnerHandler看下當(dāng)讀取到follower發(fā)送來的epoch的ack后發(fā)生了什么

 ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
 ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
 //當(dāng)收到follower發(fā)送來的對(duì)新epoch的ack后調(diào)用leader的waitForEpochAck方法
    learnerMaster.waitForEpochAck(this.getSid(), ss);
waitForEpochAck

我們分析下waitForEpochAck植旧,其實(shí)它的代碼比較簡(jiǎn)單就是判斷有沒沒有過半數(shù)的participant對(duì)新epoch進(jìn)行ack辱揭,如果有,那么這個(gè)這個(gè)新epoch就在整個(gè)集群中生效了病附,成為合法的大家公認(rèn)的在本輪次使用的epoch

 public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
    // electingFollowers 保存了已經(jīng)對(duì)新epoch進(jìn)行ack的participant數(shù)量
        synchronized (electingFollowers) {
            if (electionFinished) {
                return;
            }
            if (ss.getCurrentEpoch() != -1) {
                if (ss.isMoreRecentThan(leaderStateSummary)) {
                    throw new IOException("Follower is ahead of the leader, leader summary: "
                                          + leaderStateSummary.getCurrentEpoch()
                                          + " (current epoch), "
                                          + leaderStateSummary.getLastZxid()
                                          + " (last zxid)");
                }
                if (ss.getLastZxid() != -1 && isParticipant(id)) {
                    electingFollowers.add(id);
                }
            }
            QuorumVerifier verifier = self.getQuorumVerifier();
             //如果有過半數(shù)的participant對(duì)new epoch進(jìn)行了ack问窃,那么本輪的選舉正式完成
            if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
                electionFinished = true;
                electingFollowers.notifyAll();
            } else {
              //如果還沒有過半數(shù)的participant對(duì)new epoch 進(jìn)行ack那么線程進(jìn)入wait等待
                long start = Time.currentElapsedTime();
                long cur = start;
                long end = start + self.getInitLimit() * self.getTickTime();
                while (!electionFinished && cur < end) {
                    electingFollowers.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (!electionFinished) {
                    throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
                }
            }
        }
    }

我們?cè)倩氐絝ollower端,當(dāng)follower發(fā)送完對(duì)new epoch的ack后就會(huì)進(jìn)入zab synchronization階段完沪,我看下 follower端syncWithLeader的實(shí)現(xiàn)域庇,同樣這個(gè)方法比較長(zhǎng)我們分段分析


  protected void syncWithLeader(long newLeaderZxid) throws Exception {
          //預(yù)生產(chǎn)對(duì)leader的LEADERINFO的ack報(bào)文
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        QuorumVerifier newLeaderQV = null;

        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        boolean syncSnapshot = false;
       //讀取來自leader的數(shù)據(jù)同步指令
        readPacket(qp);

這個(gè)時(shí)候我們?cè)倩氐絃earnerHandler,當(dāng)new epoch被過半數(shù)的participant接受后覆积,LearnerHandler進(jìn)入syncFollower

LearnerHandler syncFollower

syncFollower方法根據(jù)follower的f_zxid和leader自己已經(jīng)處理的zxid 來決定如何恢復(fù)follower的數(shù)據(jù)

boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
        /*
         * When leader election is completed, the leader will set its
         * lastProcessedZxid to be (epoch < 32). There will be no txn associated
         * with this zxid.
         *
         * The learner will set its lastProcessedZxid to the same value if
         * it get DIFF or SNAP from the learnerMaster. If the same learner come
         * back to sync with learnerMaster using this zxid, we will never find this
         * zxid in our history. In this case, we will ignore TRUNC logic and
         * always send DIFF if we have old enough history
         */
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        // Keep track of the latest zxid which already queued
        long currentZxid = peerLastZxid;
        boolean needSnap = true;
       //獲取leader的zk數(shù)據(jù)庫
        ZKDatabase db = learnerMaster.getZKDatabase();
        boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
        ReentrantReadWriteLock lock = db.getLogLock();
        ReadLock rl = lock.readLock();
        try {
            rl.lock();
             //leader目前已經(jīng)處理的最大事物id
            long maxCommittedLog = db.getmaxCommittedLog();
             //leader目前已經(jīng)處理的同時(shí)還在事物隊(duì)列中的最小事物id
            long minCommittedLog = db.getminCommittedLog();

            //leader目前已經(jīng)處理的且被ack的最新事物id
            //當(dāng)maxCommittedLog>lastProcessedZxid時(shí)候
           //說當(dāng)前l(fā)eader 有一些事物在log中還沒有被集群其他機(jī)器ack
            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

            LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}"
                     + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                     + " peerLastZxid=0x{}",
                     getSid(),
                     Long.toHexString(maxCommittedLog),
                     Long.toHexString(minCommittedLog),
                     Long.toHexString(lastProcessedZxid),
                     Long.toHexString(peerLastZxid));

            if (db.getCommittedLog().isEmpty()) {
                /*
                 * It is possible that committedLog is empty. In that case
                 * setting these value to the latest txn in learnerMaster db
                 * will reduce the case that we need to handle
                 *
                 * Here is how each case handle by the if block below
                 * 1. lastProcessZxid == peerZxid -> Handle by (2)
                 * 2. lastProcessZxid < peerZxid -> Handle by (3)
                 * 3. lastProcessZxid > peerZxid -> Handle by (5)
                 */
                minCommittedLog = lastProcessedZxid;
                maxCommittedLog = lastProcessedZxid;
            }

            /*
            * 下面的英文注釋給出了syncFollower要處理的幾種數(shù)據(jù)同步方案
             * Here are the cases that we want to handle
             *
             * 1. Force sending snapshot (for testing purpose)
             //如果follower端的事物id已經(jīng)和master 事物id相同听皿,那么直接給follower發(fā)送一個(gè)空的diff狀態(tài)
             * 2. Peer and learnerMaster is already sync, send empty diff          
              //如果follower的zxid大于master的zxid,那么發(fā)送trunc消息宽档,指示follower刪除對(duì)應(yīng)zxid所關(guān)聯(lián)的事物尉姨,
             //但是如果follower發(fā)送來的zxid為newEpochZxid說明這個(gè)follower本身還沒有處理過任何事物。那么不能發(fā)送trunc
             * 3. Follower has txn that we haven't seen. This may be old leader
             *    so we need to send TRUNC. However, if peer has newEpochZxid,
             *    we cannot send TRUNC since the follower has no txnlog
              //如果follower的zxid在leader的committedLog 范圍之內(nèi)雌贱,我們需要發(fā)送diff
             * 4. Follower is within committedLog range or already in-sync.
             *    We may need to send DIFF or TRUNC depending on follower's zxid
             *    We always send empty DIFF if follower is already in-sync
              //如果follower的zxid小于minCommittedLog那么需要使用leader的commitLog和磁盤上的事物信息來恢復(fù)follower端的數(shù)據(jù)啊送,
             //如果失敗,直接使用snap的方式同步數(shù)據(jù)
             * 5. Follower missed the committedLog. We will try to use on-disk
             *    txnlog + committedLog to sync with follower. If that fail,
             *    we will send snapshot
             */
         //下面分析下上面提到的5條follower數(shù)據(jù)同步準(zhǔn)則

            if (forceSnapSync) {
                // Force learnerMaster to use snapshot to sync with follower
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                // Follower is already sync with us, send empty diff
                //follower端的zxid和leader的zxid相同欣孤,那么直接給follower發(fā)送一個(gè)diff指令馋没,后面不會(huì)發(fā)送任何數(shù)據(jù)
                LOG.info(
                    "Sending DIFF zxid=0x{} for peer sid: {}",
                    Long.toHexString(peerLastZxid),
                    getSid());
                queueOpPacket(Leader.DIFF, peerLastZxid);
                needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
               //如果follower的zxid大于leader已經(jīng)提交的最大的zxid,那么向follower端發(fā)送trunc命令降传,
              //指示follower端把自己數(shù)據(jù)庫中多存儲(chǔ)的事物給刪除了篷朵,只保留到事物id為maxCommittedLog的那些事物
                // Newer than committedLog, send trunc and done
                LOG.debug(
                    "Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}",
                    Long.toHexString(maxCommittedLog),
                    getSid());
                queueOpPacket(Leader.TRUNC, maxCommittedLog);
                currentZxid = maxCommittedLog;
                needOpPacket = false;
                needSnap = false;
            } else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
                //如果follower的fzxid在maxCommittedLog和minCommittedLog之前,那么說明follower當(dāng)前的事物落后于leader婆排,需要把leader在(fzxid,maxCommittedLog]之間事物發(fā)送給follower
                // Follower is within commitLog range
                LOG.info("Using committedLog for peer sid: {}", getSid());
                Iterator<Proposal> itr = db.getCommittedLog().iterator();
                //把follower端缺失的事物保存到帶同步隊(duì)列中
                currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                // Use txnlog and committedLog to sync
               //如果follower的fzxid小于minCommittedLog声旺,那么說明follower落后于leader比較多,需要把leader在(fzxid,maxCommittedLog]之間的事物發(fā)送給follower段只,
            //這些事物有一部分[minCommittedLog腮猖,maxCommittedLog]是leader緩存在內(nèi)存中committedLog,另一部分(fzxid,minCommittedLog)事物在log中赞枕,需要從這兩個(gè)部分中去分別恢復(fù)數(shù)據(jù)

                // Calculate sizeLimit that we allow to retrieve txnlog from disk
                long sizeLimit = db.calculateTxnLogSizeLimit();
                // This method can return empty iterator if the requested zxid
                // is older than on-disk txnlog
              //根據(jù)fzxid和sizeLimit來獲取事物log文件迭代器
                Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                    LOG.info("Use txnlog and committedLog for peer sid: {}", getSid());
                    //獲取第一個(gè)需要同步都follower的zxid
                    currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);

                    if (currentZxid < minCommittedLog) {
                        //如果currentZxid小于minCommittedLog澈缺,那么直接發(fā)snap給到follower,不需要通過diff的方式了
                        LOG.info(
                            "Detected gap between end of txnlog: 0x{} and start of committedLog: 0x{}",
                            Long.toHexString(currentZxid),
                            Long.toHexString(minCommittedLog));
                        currentZxid = peerLastZxid;
                        // Clear out currently queued requests and revert
                        // to sending a snapshot.
                        queuedPackets.clear();
                        needOpPacket = true;
                    } else {
                        LOG.debug("Queueing committedLog 0x{}", Long.toHexString(currentZxid));
               
                        Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
         //queueCommittedProposals正常會(huì)把log中的比fzxid大的事物id都放入帶同步到follower隊(duì)列中
         //但是這里面也可能會(huì)出現(xiàn)一些異常的情況炕婶,就是fzxid在log事物中姐赡,那么這個(gè)時(shí)候需要向follower發(fā)送一個(gè)trunc命令,當(dāng)然fzxid出現(xiàn)在log事物中那么向follower發(fā)送diff命令柠掂。
         //還有一種情況是log的事物id大于fzxid單是發(fā)現(xiàn)他們不是同一個(gè)epoch项滑,這個(gè)時(shí)候需要對(duì)follower做snap 全量數(shù)據(jù)的同步
                        currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
                        needSnap = false;
                    }
                }
                // closing the resources
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn(
                    "Unhandled scenario for peer sid: {} maxCommittedLog=0x{}"
                        + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                        + " peerLastZxid=0x{} txnLogSyncEnabled={}",
                    getSid(),
                    Long.toHexString(maxCommittedLog),
                    Long.toHexString(minCommittedLog),
                    Long.toHexString(lastProcessedZxid),
                    Long.toHexString(peerLastZxid),
                    txnLogSyncEnabled);
            }
            if (needSnap) {
                currentZxid = db.getDataTreeLastProcessedZxid();
            }

            LOG.debug("Start forwarding 0x{} for peer sid: {}", Long.toHexString(currentZxid), getSid());
            leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
        } finally {
            rl.unlock();
        }

        if (needOpPacket && !needSnap) {
           //直接給follower發(fā)送snap來做全量數(shù)據(jù)恢復(fù)
            // This should never happen, but we should fall back to sending
            // snapshot just in case.
            LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot",  getSid());
            needSnap = true;
        }
         //返回是不是需要做snap數(shù)據(jù)同步
        return needSnap;
    }

我們?cè)诨氐絃eaderHandler的主線程,看看在確定了是不是需要按照snap的方式恢復(fù)follower數(shù)據(jù)后發(fā)生了什么

 //syncFollower上面我們已經(jīng)解析了涯贞,會(huì)返回是不是需要按照snap的方式去恢復(fù)follower的數(shù)據(jù)
 //如果needSnap是false枪狂,那么leader可能是按照diff的方式把待恢復(fù)的事物放在了queuedPackets中危喉,
 boolean needSnap = syncFollower(peerLastZxid, learnerMaster);

            // syncs between followers and the leader are exempt from throttling because it
            // is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
            // are counted as concurrent syncs though
             //對(duì)于observer同步數(shù)據(jù)時(shí)候 需要限流
            boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
               //如果needSnap為true
               // syncThrottler 是同步限流器
                syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
                syncThrottler.beginSync(exemptFromThrottle);
                ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
                try {
                  //獲得leader最新已經(jīng)處理的zxid
                    long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                    //向follower發(fā)送snap同步數(shù)據(jù)指令
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    messageTracker.trackSent(Leader.SNAP);
                    bufferedOutput.flush();

                    LOG.info(
                        "Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                            + "send zxid of db as 0x{}, {} concurrent snapshot sync, "
                            + "snapshot sync was {} from throttle",
                        Long.toHexString(peerLastZxid),
                        Long.toHexString(leaderLastZxid),
                        Long.toHexString(zxidToSend),
                        syncThrottler.getSyncInProgress(),
                        exemptFromThrottle ? "exempt" : "not exempt");
                    // Dump data to peer
                   //leader把本地?cái)?shù)據(jù)庫發(fā)送給follower
                    learnerMaster.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
                    //數(shù)據(jù)沖刷到網(wǎng)絡(luò)上
                    bufferedOutput.flush();
                } finally {
                    ServerMetrics.getMetrics().SNAP_COUNT.add(1);
                }
            } else {
                syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
                syncThrottler.beginSync(exemptFromThrottle);
                ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
                ServerMetrics.getMetrics().DIFF_COUNT.add(1);
            }

            LOG.debug("Sending NEWLEADER message to {}", sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if

              //下面是Leader向follower發(fā)送 我是Leader的通知,并且等待follower的ack
            // we got here, so the version was set
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();

            // Start thread that blast packets in the queue to learner
           //把待發(fā)送到follower的暫存在queuedPackets的數(shù)據(jù)使用單獨(dú)的線程發(fā)送出去
            startSendingPackets();

             //讀取follower端的發(fā)送回來的ack
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            messageTracker.trackReceived(qp.getType());
            if (qp.getType() != Leader.ACK) {
                LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
                return;
            }

            LOG.debug("Received NEWLEADER-ACK message from {}", sid);
            //等待有過半數(shù)的participant確認(rèn)了Leader的NEWLEADER請(qǐng)求州疾,
           //這個(gè)時(shí)候leader的quorumPeer也在等待follower對(duì)自己NEWLEADER的確認(rèn)
            learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());

 syncLimitCheck.start();
            // sync ends when NEWLEADER-ACK is received
            syncThrottler.endSync();
            if (needSnap) {
                ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
            } else {
                ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
            }
            syncThrottler = null;

            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(learnerMaster.syncTimeout());

            /*
             * Wait until learnerMaster starts up
             */
            //當(dāng)有過半數(shù)的participant確認(rèn)了Leader的NEWLEADER地位姥饰,那么LearnerHandler就開始等待Leader事物處理引擎的啟動(dòng)
            learnerMaster.waitForStartup();
          //當(dāng)leader的server啟動(dòng)完成后,下面的代碼就是開始處理具體的業(yè)務(wù)請(qǐng)求了孝治,我們?cè)诤竺鏁?huì)解析  
Follower.syncWithLeader

下面我在說回Follower列粪,看看follower的syncWithLeader后面發(fā)生了什么,這個(gè)方法也是巨長(zhǎng),我們先分析數(shù)據(jù)同步的這一塊

 protected void syncWithLeader(long newLeaderZxid) throws Exception {
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        QuorumVerifier newLeaderQV = null;

        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        boolean syncSnapshot = false;
        //讀取來自Leader發(fā)送過來的數(shù)據(jù)同步指令
        readPacket(qp);
        Deque<Long> packetsCommitted = new ArrayDeque<>();
        Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
               //得到leader 發(fā)送的diff數(shù)據(jù)同步指令
                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                snapshotNeeded = false;
            } else if (qp.getType() == Leader.SNAP) {
               //得到leader 發(fā)送的snap數(shù)據(jù)同步指令

                self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
                // The leader is going to dump the database
                // db is clear as part of deserializeSnapshot()
                //follower本地zk數(shù)據(jù)庫直接從leader發(fā)送來的snap數(shù)據(jù)流反序列結(jié)果
                zk.getZKDatabase().deserializeSnapshot(leaderIs);
                // ZOOKEEPER-2819: overwrite config node content extracted
                // from leader snapshot with local config, to avoid potential
                // inconsistency of config node content during rolling restart.
                if (!self.isReconfigEnabled()) {
                    LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                    zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                }
                String signature = leaderIs.readString("signature");
                if (!signature.equals("BenWasHere")) {
                    LOG.error("Missing signature. Got {}", signature);
                    throw new IOException("Missing signature");
                }
                //設(shè)置本地庫最新處理的事物id
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

                // immediately persist the latest snapshot when there is txn log gap
                syncSnapshot = true;
            } else if (qp.getType() == Leader.TRUNC) {
               //接受leader發(fā)送來的trunc指令谈飒,進(jìn)行無效事物的刪除
                //we need to truncate the log to the lastzxid of the leader
                self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
                //刪除無效的事物id
                boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
                if (!truncated) {
                    // not able to truncate the log
                    LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
                    ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

            } else {
                LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
                ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
            }
         //初始化本地zk庫的/zookeeper/config節(jié)點(diǎn)的值   
         zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); 
         //創(chuàng)建LearnerSessionTracker岂座,關(guān)于  SessionTracker的作用我在之前的源碼中有專門解析 
         zk.createSessionTracker();

            long lastQueued = 0;

            // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
            // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
            // we need to make sure that we don't take the snapshot twice.
            boolean isPreZAB1_0 = true;
            //If we are not going to take the snapshot be sure the transactions are not applied in memory
            // but written out to the transaction log
            boolean writeToTxnLog = !snapshotNeeded;
            TxnLogEntry logEntry;
            // we are now going to start getting transactions to apply followed by an UPTODATE
           //這個(gè)循環(huán)就是處理接受來自leader用來恢復(fù)follower數(shù)據(jù)的proposal,LEADERINFO和UPTODATE
            outerLoop:
            while (self.isRunning()) {
                 //等待leader發(fā)送來的消息杭措,接受到的第一個(gè)消息應(yīng)該為UPTODATE
                readPacket(qp);
                switch (qp.getType()) {
                case Leader.PROPOSAL:
                    PacketInFlight pif = new PacketInFlight();
                    logEntry = SerializeUtils.deserializeTxn(qp.getData());
                    pif.hdr = logEntry.getHeader();
                    pif.rec = logEntry.getTxn();
                    pif.digest = logEntry.getDigest();
                    if (pif.hdr.getZxid() != lastQueued + 1) {
                        LOG.warn(
                            "Got zxid 0x{} expected 0x{}",
                            Long.toHexString(pif.hdr.getZxid()),
                            Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = pif.hdr.getZxid();

                    if (pif.hdr.getType() == OpCode.reconfig) {
                        SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
                        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
                        self.setLastSeenQuorumVerifier(qv, true);
                    }

                    packetsNotCommitted.add(pif);
                    break;
                //正常來說每一個(gè)proposal后面都會(huì)有一個(gè)commit指令
                case Leader.COMMIT:
                case Leader.COMMITANDACTIVATE:
                    pif = packetsNotCommitted.peekFirst();
                    if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
                        boolean majorChange = self.processReconfig(
                            qv,
                            ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
                            true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    }
                    if (!writeToTxnLog) {
                        if (pif.hdr.getZxid() != qp.getZxid()) {
                            LOG.warn(
                                "Committing 0x{}, but next proposal is 0x{}",
                                Long.toHexString(qp.getZxid()),
                                Long.toHexString(pif.hdr.getZxid()));
                        } else {
                          //把proposal應(yīng)用到本地?cái)?shù)據(jù)庫
                            zk.processTxn(pif.hdr, pif.rec);
                            packetsNotCommitted.remove();
                        }
                    } else {
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.INFORM:
                case Leader.INFORMANDACTIVATE:
                    PacketInFlight packet = new PacketInFlight();

                    if (qp.getType() == Leader.INFORMANDACTIVATE) {
                        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
                        long suggestedLeaderId = buffer.getLong();
                        byte[] remainingdata = new byte[buffer.remaining()];
                        buffer.get(remainingdata);
                        logEntry = SerializeUtils.deserializeTxn(remainingdata);
                        packet.hdr = logEntry.getHeader();
                        packet.rec = logEntry.getTxn();
                        packet.digest = logEntry.getDigest();
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData()));
                        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    } else {
                        logEntry = SerializeUtils.deserializeTxn(qp.getData());
                        packet.rec = logEntry.getTxn();
                        packet.hdr = logEntry.getHeader();
                        packet.digest = logEntry.getDigest();
                        // Log warning message if txn comes out-of-order
                        if (packet.hdr.getZxid() != lastQueued + 1) {
                            LOG.warn(
                                "Got zxid 0x{} expected 0x{}",
                                Long.toHexString(packet.hdr.getZxid()),
                                Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = packet.hdr.getZxid();
                    }
                    if (!writeToTxnLog) {
                        // Apply to db directly if we haven't taken the snapshot
                        zk.processTxn(packet.hdr, packet.rec);
                    } else {
                        packetsNotCommitted.add(packet);
                        packetsCommitted.add(qp.getZxid());
                    }

                    break;
                case Leader.UPTODATE:
                  //接受到來自leader的UPTODATE的信息
                    LOG.info("Learner received UPTODATE message");
                    if (newLeaderQV != null) {
                        boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    }
                    if (isPreZAB1_0) {
                        zk.takeSnapshot(syncSnapshot);
                        self.setCurrentEpoch(newEpoch);
                    }
                   
                    self.setZooKeeperServer(zk);
                    self.adminServer.setZooKeeperServer(zk);
                   //跳出循環(huán)费什,完成數(shù)據(jù)同步,準(zhǔn)備啟動(dòng)本地的zk服務(wù)
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
                    // means this is Zab 1.0
                    //接受到了leader的 NEWLEADER 信號(hào)
                    LOG.info("Learner received NEWLEADER message");
                    if (qp.getData() != null && qp.getData().length > 1) {
                        try {
                           //從leader發(fā)送來的消息中獲得機(jī)器機(jī)器的信息
                            QuorumVerifier qv = self.configFromString(new String(qp.getData()));
                            self.setLastSeenQuorumVerifier(qv, true);
                            newLeaderQV = qv;
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    if (snapshotNeeded) {
                        zk.takeSnapshot(syncSnapshot);
                    }
                    //設(shè)置currentEpoch
                    self.setCurrentEpoch(newEpoch);
                    writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                    isPreZAB1_0 = false;
                     //給leader的 NEWLEADER發(fā)送ack消息
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
       //發(fā)送對(duì)leader UPTODATE的ack
      ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        sock.setSoTimeout(self.tickTime * self.syncLimit);
        self.setSyncMode(QuorumPeer.SyncMode.NONE);
        //啟動(dòng)follower的執(zhí)行引擎手素,
        zk.startup();

follower執(zhí)行引擎啟動(dòng)

在收到了來自服務(wù)端的UPTODATE的消息后鸳址,follower就會(huì)進(jìn)入啟動(dòng)執(zhí)行引擎的過程:初始化請(qǐng)求處理鏈,啟動(dòng)sessionTracker

protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
        syncProcessor.start();
    }
follower 處理來自leader的請(qǐng)求

followLeader方法處理來自leader請(qǐng)求的代碼片段泉懦,我會(huì)在zk集群請(qǐng)求處理過程中去解析

  while (this.isRunning()) {
                    //讀取leader發(fā)送來的消息
                    readPacket(qp);
                    //處理接受的leader消息
                    processPacket(qp);
                }
Leader 執(zhí)行引擎啟動(dòng)

當(dāng)leader收到過半數(shù)的對(duì)自己LEADERINFO的ack之后稿黍,那么就會(huì)啟動(dòng)zk的執(zhí)行引擎包括創(chuàng)建session tracker,初始化請(qǐng)求處理鏈崩哩,下面是leader請(qǐng)求處理鏈的初始化過程巡球,我會(huì)在下一篇解析zk處理請(qǐng)求的時(shí)候詳細(xì)解析他們

 protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
        //處理節(jié)點(diǎn)為container類型的線程類
        setupContainerManager();
    }
Leader QuorumPeer

leader 執(zhí)行引擎啟動(dòng)完成之后,leader主線程進(jìn)入定期檢查各個(gè)follower是不是處于同步的狀態(tài)任務(wù)

LearnerHandler 請(qǐng)求處理

當(dāng)Leader啟動(dòng)完成之后邓嘹,對(duì)應(yīng)的LearnerHandler進(jìn)入請(qǐng)求處理代碼酣栈,我會(huì)在下一遍文章中去解析這個(gè)過程

 while (true) {
                qp = new QuorumPacket();
                ia.readRecord(qp, "packet");
                messageTracker.trackReceived(qp.getType());

                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                if (qp.getType() == Leader.PING) {
                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                }
                tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();

                packetsReceived.incrementAndGet();

                ByteBuffer bb;
                long sessionId;
                int cxid;
                int type;

                switch (qp.getType()) {
                case Leader.ACK:
                    if (this.learnerType == LearnerType.OBSERVER) {
                        LOG.debug("Received ACK from Observer {}", this.sid);
                    }
                    syncLimitCheck.updateAck(qp.getZxid());
                    learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;
                case Leader.PING:
                    // Process the touches
                    ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                    DataInputStream dis = new DataInputStream(bis);
                    while (dis.available() > 0) {
                        long sess = dis.readLong();
                        int to = dis.readInt();
                        learnerMaster.touch(sess, to);
                    }
                    break;
                case Leader.REVALIDATE:
                    ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
                    learnerMaster.revalidateSession(qp, this);
                    break;
                case Leader.REQUEST:
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if (type == OpCode.sync) {
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
                    learnerMaster.submitLearnerRequest(si);
                    requestsReceived.incrementAndGet();
                    break;
                default:
                    LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                    break;
                }
            }
        } catch (IOException e) {
            if (sock != null && !sock.isClosed()) {
                LOG.error("Unexpected exception causing shutdown while sock still open", e);
                //close the socket to make sure the
                //other side can see it being close
                try {
                    sock.close();
                } catch (IOException ie) {
                    // do nothing
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected exception in LearnerHandler.", e);
        } catch (SyncThrottleException e) {
            LOG.error("too many concurrent sync.", e);
            syncThrottler = null;
        } catch (Exception e) {
            LOG.error("Unexpected exception in LearnerHandler.", e);
            throw e;
        } finally {
            if (syncThrottler != null) {
                syncThrottler.endSync();
                syncThrottler = null;
            }
            String remoteAddr = getRemoteAddress();
            LOG.warn("******* GOODBYE {} ********", remoteAddr);
            messageTracker.dumpToLog(remoteAddr);
            shutdown();
        }

至此就完成了對(duì)zk Leader,F(xiàn)ollower初始化的分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末汹押,一起剝皮案震驚了整個(gè)濱河市矿筝,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌棚贾,老刑警劉巖窖维,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異鸟悴,居然都是意外死亡陈辱,警方通過查閱死者的電腦和手機(jī)奖年,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門细诸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人陋守,你說我怎么就攤上這事讯屈∏惴。” “怎么了涡上?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)媚送。 經(jīng)常有香客問我,道長(zhǎng)寇甸,這世上最難降的妖魔是什么塘偎? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮拿霉,結(jié)果婚禮上吟秩,老公的妹妹穿的比我還像新娘。我一直安慰自己绽淘,他們只是感情好涵防,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著沪铭,像睡著了一般壮池。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上杀怠,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天椰憋,我揣著相機(jī)與錄音,去河邊找鬼赔退。 笑死熏矿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的离钝。 我是一名探鬼主播票编,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼卵渴!你這毒婦竟也來了慧域?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤浪读,失蹤者是張志新(化名)和其女友劉穎昔榴,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碘橘,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡互订,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了痘拆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片仰禽。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吐葵,到底是詐尸還是另有隱情规揪,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布温峭,位于F島的核電站猛铅,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏凤藏。R本人自食惡果不足惜奸忽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望揖庄。 院中可真熱鬧月杉,春花似錦、人聲如沸抠艾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽检号。三九已至腌歉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間齐苛,已是汗流浹背翘盖。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留凹蜂,地道東北人馍驯。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像玛痊,于是被迫代替她去往敵國(guó)和親汰瘫。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • 今天感恩節(jié)哎擂煞,感謝一直在我身邊的親朋好友混弥。感恩相遇!感恩不離不棄对省。 中午開了第一次的黨會(huì)蝗拿,身份的轉(zhuǎn)變要...
    迷月閃星情閱讀 10,575評(píng)論 0 11
  • 彩排完,天已黑
    劉凱書法閱讀 4,229評(píng)論 1 3
  • 表情是什么蒿涎,我認(rèn)為表情就是表現(xiàn)出來的情緒哀托。表情可以傳達(dá)很多信息。高興了當(dāng)然就笑了劳秋,難過就哭了仓手。兩者是相互影響密不可...
    Persistenc_6aea閱讀 125,348評(píng)論 2 7