zookeeper源碼分析(6)-數(shù)據(jù)和存儲

在Zookeeper中虹脯,數(shù)據(jù)存儲分為兩部分:內(nèi)存數(shù)據(jù)存儲和磁盤數(shù)據(jù)存儲玫锋。本文主要分析服務(wù)器啟動時內(nèi)存數(shù)據(jù)庫的初始化過程主從服務(wù)器數(shù)據(jù)同步的過程含长。在此之前介紹一些數(shù)據(jù)存儲涉及的基本類葛虐。
DataTree
Zookeeper的數(shù)據(jù)模型是一棵樹,DataTree是內(nèi)存數(shù)據(jù)存儲的核心鲫寄,代表了內(nèi)存中一份完整的數(shù)據(jù)(最新),包括所有的節(jié)點路徑疯淫,節(jié)點數(shù)據(jù)和ACL信息地来,對應(yīng)watches等。類的主要屬性為:

    //節(jié)點路徑為key,節(jié)點數(shù)據(jù)內(nèi)容DataNode為value.實時存儲了所有的zk節(jié)點熙掺,使用ConcurrentHashMap保證并發(fā)性
    private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();

//節(jié)點數(shù)據(jù)對應(yīng)的watch
    private final WatchManager dataWatches = new WatchManager();

//節(jié)點路徑對應(yīng)的watch
    private final WatchManager childWatches = new WatchManager();

//key為sessionId,value為該會話對應(yīng)的臨時節(jié)點路徑未斑,方便實時訪問和清理
    private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();

//This set contains the paths of all container nodes
    private final Set<String> containers =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

//This set contains the paths of all ttl nodes
    private final Set<String> ttls =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//內(nèi)存數(shù)據(jù)庫的最大zxid
public volatile long lastProcessedZxid = 0;

DataNode
數(shù)據(jù)存儲的最小單元,包含節(jié)點的數(shù)據(jù)內(nèi)容币绩,節(jié)點狀態(tài)蜡秽,子節(jié)點列表,以及對子節(jié)點的操作接口等缆镣,主要屬性為:

//節(jié)點內(nèi)容
    byte data[];
    Long acl;
    //節(jié)點狀態(tài)芽突,包括一些節(jié)點的元數(shù)據(jù),如ephemeralOwner董瞻,czxid等
    public StatPersisted stat;
//子節(jié)點相對父節(jié)點路徑集合寞蚌,不包括父節(jié)點路徑
    private Set<String> children = null;

ZKDatabase
Zookeeper的內(nèi)存數(shù)據(jù)庫,負(fù)責(zé)管理Zookeeper的所有會話,DataTree存儲和事務(wù)日志睬澡。它會定時向磁盤dump快照數(shù)據(jù)(snapCount主要控制)固额,服務(wù)器啟動時,會通過磁盤上的事務(wù)日志和快照數(shù)據(jù)文件恢復(fù)成完整的內(nèi)存數(shù)據(jù)庫煞聪。主要屬性為:

    protected DataTree dataTree;
//key為sessionId,value為會話過期時間
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//用于和磁盤交互事務(wù)日志文件和快照文件的類
    protected FileTxnSnapLog snapLog;
//主從數(shù)據(jù)同步時使用
    protected long minCommittedLog, maxCommittedLog;
    public static final int commitLogCount = 500;
    protected static int commitLogBuffer = 700;
//todo
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();

文件存儲主要包括事務(wù)日志文件的存儲和快照文件的存儲斗躏,分別與FileTxnLog和FileSnap類有關(guān)。
FileTxnLog
實現(xiàn)了TxnLog接口昔脯,提供了API可以獲取日志和寫入日志啄糙,首先先看一下事務(wù)日志文件的格式

LogFile:
//一個日志文件由以下三個部分組成
 *     FileHeader TxnList ZeroPad
//1.文件頭
 * FileHeader: {
 *     magic 4bytes (ZKLG)   
 *     version 4bytes
 *     dbid 8bytes
 *   }
 //事務(wù)內(nèi)容
 * TxnList:
 *     Txn || Txn TxnList

 * Txn:
//一條事務(wù)日志的組成部分
 *     checksum Txnlen TxnHeader Record 0x42

 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)

主要分析下寫入日志日志截斷的過程
寫入日志

public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr == null) {
            return false;
        }
//lastZxidSeen:最大(新)的zxid
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
//如果沒有事務(wù)日志可寫,需要關(guān)聯(lián)一個新的文件流云稚,寫入日志文件頭信息FileHeader隧饼,并馬上強制刷盤
        if (logStream==null) {
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }

           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");
           // Make sure that the magic number is written before padding.
           logStream.flush();
           filePadding.setCurrentSize(fos.getChannel().position());
           streamsToFlush.add(fos);
        }
//確定事務(wù)日志文件是否需要擴容(預(yù)分配)
        filePadding.padFile(fos.getChannel());
//事務(wù)序列化
        byte[] buf = Util.marshallTxnEntry(hdr, txn);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
//生成Checksum
        Checksum crc = makeChecksumAlgorithm();
        crc.update(buf, 0, buf.length);
        oa.writeLong(crc.getValue(), "txnEntryCRC");
//寫入事務(wù)日志文件流
        Util.writeTxnBytes(oa, buf);

        return true;
    }

主要流程為:
1.確定是否有事務(wù)日志可寫
當(dāng)zookeeper服務(wù)器啟動完成時需要進行第一次事務(wù)日志的寫入,或是上一個事務(wù)日志寫滿時静陈,都會處于與事務(wù)日志文件斷開的狀態(tài)燕雁。當(dāng)logStream==null時需要關(guān)聯(lián)一個新的文件流,寫入日志文件頭信息FileHeader鲸拥,并馬上強制刷盤拐格。
2.確定事務(wù)日志文件是否需要擴容(預(yù)分配)

long padFile(FileChannel fileChannel) throws IOException {
//currentSize:當(dāng)前文件的大小位置
//preAllocSize:默認(rèn)64MB
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        if (currentSize != newFileSize) {
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }
//判斷是否需要擴容
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
        // If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
        if (preAllocSize > 0 && position + 4096 >= fileSize) {
            // If we have written more than we have previously preallocated we need to make sure the new
            // file size is larger than what we already have
            if (position > fileSize) {
                fileSize = position + preAllocSize;
                fileSize -= fileSize % preAllocSize;
            } else {
                fileSize += preAllocSize;
            }
        }

        return fileSize;
    }

calculateFileSizeWithPadding中可以看出,當(dāng)寫入數(shù)據(jù)量超過4KB的時候便會將文件大小currentSize擴容到preAllocSize,默認(rèn)為64MB,并將未寫入部分填充0刑赶,好處是避免開辟新的磁盤塊捏浊,減少磁盤Seek
3.事務(wù)序列化
分別對事物頭(TxnHeader)和事務(wù)體(Record)序列化,參考zookeeper源碼分析(5)-序列化和協(xié)議
4.生成Checksum
可校驗事務(wù)日志文件的完整性和數(shù)據(jù)準(zhǔn)確性
5.寫入事務(wù)日志文件流
將事物頭撞叨,事務(wù)體和Checksum寫入文件流中金踪,由于使用的輸出流是BufferedOutputStream,會先放到緩沖區(qū)中牵敷,不會真正寫入
日志截斷
在主從同步時绝骚,如果learner服務(wù)器的事務(wù)ID大于leader服務(wù)器的事務(wù)ID,將會要求learner服務(wù)器丟棄掉比leader服務(wù)器的事務(wù)ID大的事務(wù)日志毕籽。
FileTxnIterator是可以指定zxid的事務(wù)日志迭代器,也就是說如果需要從zxid=11的位置開始創(chuàng)建一個迭代器,那么該臺服務(wù)器上面在zxid=11之后的日志都會保存在該迭代器中并闲。其主要屬性為:

 public static class FileTxnIterator implements TxnLog.TxnIterator {
//事務(wù)日志的目錄
        File logDir;
//需要從該事務(wù)ID處獲得迭代器
        long zxid;
//zxid所在事務(wù)文件的文件頭
        TxnHeader hdr;
//當(dāng)前正在迭代的事務(wù)日志
        Record record;
//zxid所在的事務(wù)日志文件
        File logFile;
//輸入流
        InputArchive ia;
        static final String CRC_ERROR="CRC check failed";
//輸入流碗暗,可讀取到zxid的位置
        PositionInputStream inputStream=null;
//比zxid所在事務(wù)日志文件大的事務(wù)文件集合
        private ArrayList<File> storedFiles;
··········省略代碼·······
}
public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            itr = new FileTxnIterator(this.logDir, zxid);
            PositionInputStream input = itr.inputStream;
            if(input == null) {
                throw new IOException("No log files found to truncate! This could " +
                        "happen if you still have snapshots from an old setup or " +
                        "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
            }
            long pos = input.getPosition();
            // now, truncate at the current position
            RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
            raf.setLength(pos);
            raf.close();
            while(itr.goToNextLog()) {
                if (!itr.logFile.delete()) {
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            close(itr);
        }
        return true;
    }

從代碼中可以看出萍摊,截斷的邏輯就是刪掉zxid所在事務(wù)文件中比zxid大的事務(wù)日志倡勇,以及所有比該事務(wù)文件大的事務(wù)文件。

FileSnap
數(shù)據(jù)快照是用來記錄zookeeper服務(wù)器在某一時刻的全量內(nèi)存數(shù)據(jù)村生,并將其寫入到指定位置磁盤上惊暴。存儲內(nèi)容包括DataTree信息和會話信息。FileSnap提供了快照相應(yīng)的接口趁桃,辽话,主要包括存儲肄鸽、序列化、反序列化油啤、訪問相應(yīng)快照文件典徘。

FileTxnSnapLog
封裝了TxnLog和SnapShot,提供了從磁盤中恢復(fù)內(nèi)存數(shù)據(jù)庫的restore方法和保存快照的save方法,主要屬性

//the directory containing the
    //the transaction logs
    private final File dataDir;
    //the directory containing the snapshot directory
    private final File snapDir;
    private TxnLog txnLog;
    private SnapShot snapLog;
    // 版本號
    public final static int VERSION = 2;
    // 版本
    public final static String version = "version-";

首先看下保存快照的save方法

//syncSnap: sync the snapshot immediately after write
public void save(DataTree dataTree,
                     ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
                     boolean syncSnap)
        throws IOException {
        long lastZxid = dataTree.lastProcessedZxid;
        File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
        LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
                snapshotFile);
        snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);

    }

主要流程就是根據(jù)當(dāng)前dataTree的最新事務(wù)id生成快照文件名益咬,然后將dataTree的內(nèi)容和sessionsWithTimeouts(會話信息)序列化逮诲,存到指定磁盤位置。

服務(wù)器啟動期間的數(shù)據(jù)初始化

就是磁盤中最新快照文件(全量數(shù)據(jù))和它之后的事務(wù)日志數(shù)據(jù)(增量數(shù)據(jù))的反序列化到內(nèi)存數(shù)據(jù)庫中的過程幽告,流程圖為:


回到zookeeper源碼分析(1)-服務(wù)端啟動流程梅鹦,在服務(wù)器啟動時,需要先初始化FileTxnSnapLog初始化 ZKDatabase
1.初始化FileTxnSnapLog

 public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
        LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

        this.dataDir = new File(dataDir, version + VERSION);
        this.snapDir = new File(snapDir, version + VERSION);

        // by default create snap/log dirs, but otherwise complain instead
        // See ZOOKEEPER-1161 for more details
        boolean enableAutocreate = Boolean.valueOf(
                System.getProperty(ZOOKEEPER_DATADIR_AUTOCREATE,
                        ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT));

        if (!this.dataDir.exists()) {
            if (!enableAutocreate) {
                throw new DatadirException("Missing data directory "
                        + this.dataDir
                        + ", automatic data directory creation is disabled ("
                        + ZOOKEEPER_DATADIR_AUTOCREATE
                        + " is false). Please create this directory manually.");
            }

            if (!this.dataDir.mkdirs()) {
                throw new DatadirException("Unable to create data directory "
                        + this.dataDir);
            }
        }
        if (!this.dataDir.canWrite()) {
            throw new DatadirException("Cannot write to data directory " + this.dataDir);
        }

        if (!this.snapDir.exists()) {
            // by default create this directory, but otherwise complain instead
            // See ZOOKEEPER-1161 for more details
            if (!enableAutocreate) {
                throw new DatadirException("Missing snap directory "
                        + this.snapDir
                        + ", automatic data directory creation is disabled ("
                        + ZOOKEEPER_DATADIR_AUTOCREATE
                        + " is false). Please create this directory manually.");
            }

            if (!this.snapDir.mkdirs()) {
                throw new DatadirException("Unable to create snap directory "
                        + this.snapDir);
            }
        }
        if (!this.snapDir.canWrite()) {
            throw new DatadirException("Cannot write to snap directory " + this.snapDir);
        }

        // check content of transaction log and snapshot dirs if they are two different directories
        // See ZOOKEEPER-2967 for more details
        if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
//用來檢查當(dāng)dataDir和snapDir不同時冗锁,dataDir是否包含了快照文件齐唆,snapDir是否包含了事務(wù)日志文件
            checkLogDir();
            checkSnapDir();
        }

        txnLog = new FileTxnLog(this.dataDir);
        snapLog = new FileSnap(this.snapDir);

        autoCreateDB = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DB_AUTOCREATE,
                ZOOKEEPER_DB_AUTOCREATE_DEFAULT));
    }

可以看到會在傳入的datadir和snapdir目錄下新生成version-2的目錄,并且會判斷目錄是否創(chuàng)建成功冻河,之后會創(chuàng)建txnLog和snapLog箍邮。
2.初始化 ZKDatabase

 public ZKDatabase(FileTxnSnapLog snapLog) {
        dataTree = new DataTree();
        sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
        this.snapLog = snapLog;
·······
}

可以看到主要初始化了DataTree和sessionsWithTimeouts,前者會在zookeeper創(chuàng)建一些配置跟節(jié)點芋绸,如/,/zookeeper,/zookeeper/quota等節(jié)點媒殉,與zookeeper自身服務(wù)器相關(guān)的節(jié)點。
之后調(diào)用數(shù)據(jù)初始化的方法為ZooKeeperServer.loadData

public void loadData() throws IOException, InterruptedException {
//如果是leader服務(wù)器摔敛,會在lead方法中再次調(diào)用該方法,此時zkDb.isInitialized()=true,僅做快照存儲的工作
        if(zkDb.isInitialized()){
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else {
//第一次初始化
            setZxid(zkDb.loadDataBase());
        }
·········會話過期清理的代碼···········
        // Make a clean snapshot
        takeSnapshot();
    }

    public void takeSnapshot() {
        takeSnapshot(false);
    }

    public void takeSnapshot(boolean syncSnap){
       txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
·········省略異常檢查···········
     }

第一次初始化的時候會調(diào)用zkDb.loadDataBase(),該方法最終會返回內(nèi)存數(shù)據(jù)庫最新的事務(wù)id

public long loadDataBase() throws IOException {
        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }

也就是調(diào)用FileTxnSnapLog.restore全封,首先介紹下FileTxnSnapLog的內(nèi)部類PlayBackListener
它是用來接收事務(wù)應(yīng)用過程中的回調(diào)马昙,在Zookeeper數(shù)據(jù)恢復(fù)后期,會有事務(wù)修正過程(增量數(shù)據(jù)的反序列化過程)刹悴,此過程會回調(diào)PlayBackListener.onTxnLoaded來進行對應(yīng)的數(shù)據(jù)修正行楞。這里傳入的是commitProposalPlaybackListener

FileTxnSnapLog.restore

//方法參數(shù)中DataTree dt, Map<Long, Integer> sessions是要恢復(fù)內(nèi)存數(shù)據(jù)庫的對象,其實就是ZKDatabase中的屬性
//PlayBackListener是用來修正事務(wù)日志時回調(diào)用的
    public long restore(DataTree dt, Map<Long, Integer> sessions,
                        PlayBackListener listener) throws IOException {
//解析快照數(shù)據(jù)
        long deserializeResult = snapLog.deserialize(dt, sessions);
        FileTxnLog txnLog = new FileTxnLog(dataDir);
        boolean trustEmptyDB;
        File initFile = new File(dataDir.getParent(), "initialize");
        if (Files.deleteIfExists(initFile.toPath())) {
            LOG.info("Initialize file found, an empty database will not block voting participation");
            trustEmptyDB = true;
        } else {
//
            trustEmptyDB = autoCreateDB;
        }
        if (-1L == deserializeResult) {
            /* this means that we couldn't find any snapshot, so we need to
             * initialize an empty database (reported in ZOOKEEPER-2325) */
            if (txnLog.getLastLoggedZxid() != -1) {
                throw new IOException(
                        "No snapshot found, but there are log entries. " +
                        "Something is broken!");
            }
//默認(rèn)相信空磁盤數(shù)據(jù)土匀,因為服務(wù)器第一次啟動的時候數(shù)據(jù)一般為空
            if (trustEmptyDB) {
                /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                 *       or use Map on save() */
                save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);

                /* return a zxid of 0, since we know the database is empty */
                return 0L;
            } else {
                /* return a zxid of -1, since we are possibly missing data */
                LOG.warn("Unexpected empty data tree, setting zxid to -1");
                dt.lastProcessedZxid = -1L;
                return -1L;
            }
        }
        return fastForwardFromEdits(dt, sessions, listener);
    }

3.解析快照數(shù)據(jù)
解析快照數(shù)據(jù)到datatree和sessions中子房,取出最新的100個快照數(shù)據(jù),依次解析判斷快照文件是否有數(shù)據(jù)且是可用的snapLog.deserialize(dt, sessions),返回快照文件數(shù)據(jù)的最大ZXID

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        boolean foundValid = false;
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            snap = snapList.get(i);
            LOG.info("Reading snapshot " + snap);
            try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
                 CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
                InputArchive ia = BinaryInputArchive.getArchive(crcIn);
                deserialize(dt, sessions, ia);
                long checkSum = crcIn.getChecksum().getValue();
                long val = ia.readLong("val");
                if (val != checkSum) {
                    throw new IOException("CRC corruption in snapshot :  " + snap);
                }
                foundValid = true;
                break;
            } catch (IOException e) {
                LOG.warn("problem reading snap file " + snap, e);
            }
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
        dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
        return dt.lastProcessedZxid;
    }

若返回-1就轧,說明不存在快照文件:
如果事務(wù)日志文件zxid也為-1证杭,說明磁盤數(shù)據(jù)為空,則將空數(shù)據(jù)快照一下妒御,返回最大事務(wù)id,為0解愤。否則,調(diào)用fastForwardFromEdits

4.獲取最新的ZXID

public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                     PlayBackListener listener) throws IOException {

        TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
        long highestZxid = dt.lastProcessedZxid;
        TxnHeader hdr;
        try {
            while (true) {
                // iterator points to
                // the first valid txn when initialized
                hdr = itr.getHeader();
                if (hdr == null) {
                    //empty logs
                    return dt.lastProcessedZxid;
                }
                if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                    LOG.error("{}(highestZxid) > {}(next log) for type {}",
                            highestZxid, hdr.getZxid(), hdr.getType());
                } else {
                    highestZxid = hdr.getZxid();
                }
                try {
                    processTransaction(hdr,dt,sessions, itr.getTxn());
                } catch(KeeperException.NoNodeException e) {
                   throw new IOException("Failed to process transaction type: " +
                         hdr.getType() + " error: " + e.getMessage(), e);
                }
                listener.onTxnLoaded(hdr, itr.getTxn());
                if (!itr.next())
                    break;
            }
        } finally {
            if (itr != null) {
                itr.close();
            }
        }
        return highestZxid;
    }

首先基于當(dāng)前dt.lastProcessedZxid+1獲取一個事務(wù)日志迭代器乎莉,這些事務(wù)日志是需要更新的增量數(shù)據(jù)送讲。while循環(huán)一條條迭代這些事務(wù)日志奸笤,不斷的更新highestZxid,最終將其返回哼鬓。
5.應(yīng)用事務(wù)
在循環(huán)過程中處理事務(wù)日志processTransaction,也就是根據(jù)事務(wù)日志類型不斷的更新sessions 和DataTree中的數(shù)據(jù)內(nèi)容
6.回調(diào)事務(wù)
回調(diào)listener.onTxnLoaded监右,就是ZKDatabase中的commitProposalPlaybackListener

private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
        public void onTxnLoaded(TxnHeader hdr, Record txn){
            addCommittedProposal(hdr, txn);
        }
    };

private void addCommittedProposal(TxnHeader hdr, Record txn) {
        Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        addCommittedProposal(r);
    }

    /**
     * maintains a list of last <i>committedLog</i>
     *  or so committed requests. This is used for
     * fast follower synchronization.
     * @param request committed request
     */
    public void addCommittedProposal(Request request) {
        WriteLock wl = logLock.writeLock();
        try {
            wl.lock();
            if (committedLog.size() > commitLogCount) {
                committedLog.removeFirst();
                minCommittedLog = committedLog.getFirst().packet.getZxid();
            }
            if (committedLog.isEmpty()) {
//
                minCommittedLog = request.zxid;
                maxCommittedLog = request.zxid;
            }

            byte[] data = SerializeUtils.serializeRequest(request);
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            committedLog.add(p);
            maxCommittedLog = p.packet.getZxid();
        } finally {
            wl.unlock();
        }
    }

主要邏輯在addCommittedProposal方法中,構(gòu)造了一個LinkedList<Proposal> committedLog,用來存儲過來的每一條增量事務(wù)日志异希,minCommittedLog保存的是第一條增量事務(wù)日志的zxid, maxCommittedLog保存的是最后以條增量事務(wù)日志的zxid健盒。這三個變量是用來主從做快速同步判斷用的。
7.epoch校驗
epoch標(biāo)識了當(dāng)前l(fā)eader的周期宠互,每次選舉產(chǎn)生一個新的Leader服務(wù)器之后味榛,就會生成一個新的epoch。集群間相互通信的過程中予跌,都會帶上這個epoch以確保彼此在同一個Leader周期內(nèi)搏色。
對于leader服務(wù)器,完成數(shù)據(jù)初始化時會將自己的currentEpoch和剛解析出來的最大zxid放到leaderStateSummary中券册,和主動連接的learner服務(wù)器的epoch最大zxid對比频轿,必須保證leader服務(wù)器的leaderStateSummary大于learner服務(wù)器的StateSummary才能說明leader服務(wù)器的數(shù)據(jù)是比learner服務(wù)器新的,然后leader服務(wù)器才可以開啟新一輪的epoch烁焙,進行數(shù)據(jù)同步的工作航邢。

主從服務(wù)器間的數(shù)據(jù)同步

大致過程如圖:


zookeeper源碼分析(4)-選舉流程和服務(wù)器啟動處理可知,當(dāng)LearnerHandler接收到Learner服務(wù)器的ACKEPOCH消息后會開始進行主從同步

Leader數(shù)據(jù)同步發(fā)送過程

LearnerHandler.run

public void run() {
           ····省略接收ACKEPOCH消息之前的交互過程···
            //learner zxid
            peerLastZxid = ss.getLastZxid();
           
            // Take any necessary action if we need to send TRUNC or DIFF
            // startForwarding() will be called in all cases
            //確定是否需要進行全量同步
            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
            
            LOG.debug("Sending NEWLEADER message to " + sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
            // we got here, so the version was set
//發(fā)送NEWLEADER消息
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
                                .toString().getBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
//強刷骄蝇,這里對應(yīng)的DIFF/TRUNC/DIFF+TRUNC方式的同步
            bufferedOutput.flush();

            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
//全量同步
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();
                    // Dump data to peer
                    leader.zk.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
//強刷膳殷,這里對應(yīng)的SNAP方式的同步
            bufferedOutput.flush();
                    bufferedOutput.flush();
                } finally {
                    snapshot.close();
                }
            }

            // Start thread that blast packets in the queue to learner
            startSendingPackets();
            
        //等待learner服務(wù)器的同步完成的ACK通知
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.ACK){
                LOG.error("Next packet was supposed to be an ACK,"
                    + " but received packet: {}", packetToString(qp));
                return;
            }

            if(LOG.isDebugEnabled()){
                LOG.debug("Received NEWLEADER-ACK message from " + sid);   
            }
            leader.waitForNewLeaderAck(getSid(), qp.getZxid());
//同步時間檢測,不能超過tickTime*syncLimit
            syncLimitCheck.start();
            
            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

            /*
             * Wait until leader starts up
             */
            synchronized(leader.zk){
                while(!leader.zk.isRunning() && !this.isInterrupted()){
                    leader.zk.wait(20);
                }
            }
            // Mutation packets will be queued during the serialize,
            // so we need to mark when the peer can actually start
            // using the data
            //
            LOG.debug("Sending UPTODATE message to " + sid);      
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

          ···············同步完成九火,開始與learner服務(wù)器的正常通信···········
    }

在服務(wù)器數(shù)據(jù)初始化時候赚窃,我們提到內(nèi)存數(shù)據(jù)庫zkDatabase會保存最新快照之后的增量數(shù)據(jù),
LinkedList<Proposal> committedLog:用來存儲過來的每一條增量事務(wù)日志
minCommittedLog:第一條增量事務(wù)日志的zxid
maxCommittedLog:最后一條增量事務(wù)日志的zxid
Leader服務(wù)器會根據(jù)learner服務(wù)器的最大事務(wù)ID: peerLastZxidminCommittedLog/ maxCommittedLog之間的大小關(guān)系來最終確定是差異同步還是全量同步岔激,主要邏輯在syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader)

public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
//learner服務(wù)器zxid是否為0
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        // Keep track of the latest zxid which already queued
        long currentZxid = peerLastZxid;
        boolean needSnap = true;
//是否設(shè)置了快照大小參數(shù)勒极,默認(rèn)設(shè)置了,且snapshotSizeFactor=0.33
        boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
        ReentrantReadWriteLock lock = db.getLogLock();
        ReadLock rl = lock.readLock();
        try {
            rl.lock();
            long maxCommittedLog = db.getmaxCommittedLog();
            long minCommittedLog = db.getminCommittedLog();
            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
            if (db.getCommittedLog().isEmpty()) {
                /*
                 * It is possible that committedLog is empty. In that case
                 * setting these value to the latest txn in leader db
                 * will reduce the case that we need to handle
                 *
                 * Here is how each case handle by the if block below
                 * 1. lastProcessZxid == peerZxid -> Handle by (2)
                 * 2. lastProcessZxid < peerZxid -> Handle by (3)
                 * 3. lastProcessZxid > peerZxid -> Handle by (5)
                 */
                minCommittedLog = lastProcessedZxid;
                maxCommittedLog = lastProcessedZxid;
            }

            /*
             * Here are the cases that we want to handle
             *
             * 1. Force sending snapshot (for testing purpose)
             * 2. Peer and leader is already sync, send empty diff
             * 3. Follower has txn that we haven't seen. This may be old leader
             *    so we need to send TRUNC. However, if peer has newEpochZxid,
             *    we cannot send TRUNC since the follower has no txnlog
             * 4. Follower is within committedLog range or already in-sync.
             *    We may need to send DIFF or TRUNC depending on follower's zxid
             *    We always send empty DIFF if follower is already in-sync
             * 5. Follower missed the committedLog. We will try to use on-disk
             *    txnlog + committedLog to sync with follower. If that fail,
             *    we will send snapshot
             */

            if (forceSnapSync) {
                // Force leader to use snapshot to sync with follower
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                // Follower is already sync with us, send empty diff
             //將packet發(fā)送到queuedPackets中虑鼎,queuedPackets是負(fù)責(zé)發(fā)送消息到learner服務(wù)器的隊列
                queueOpPacket(Leader.DIFF, peerLastZxid);
                needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
                // Newer than committedLog, send trunc and done
                queueOpPacket(Leader.TRUNC, maxCommittedLog);
                currentZxid = maxCommittedLog;
                needOpPacket = false;
                needSnap = false;
            } else if ((maxCommittedLog >= peerLastZxid)
                    && (minCommittedLog <= peerLastZxid)) {
                // Follower is within commitLog range
                Iterator<Proposal> itr = db.getCommittedLog().iterator();
//差異化同步辱匿,發(fā)送(peerLaxtZxid, maxZxid]之間的消息給learner服務(wù)器
                currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                                     null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                // Use txnlog and committedLog to sync

                // Calculate sizeLimit that we allow to retrieve txnlog from disk
                long sizeLimit = db.calculateTxnLogSizeLimit();
                // This method can return empty iterator if the requested zxid
                // is older than on-disk txnlog
                Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
                        peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                   
                    currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                                         minCommittedLog, maxCommittedLog);

                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                                         null, maxCommittedLog);
                    needSnap = false;
                }
                // closing the resources
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn("Unhandled scenario for peer sid: " +  getSid());
            }
            LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
                      " for peer sid: " +  getSid());
//lets the leader know that a follower is capable of following and is done syncing
//已經(jīng)通過的提議但是還沒來得及提交的Proposal
            leaderLastZxid = leader.startForwarding(this, currentZxid);
        } finally {
            rl.unlock();
        }
//needOpPacket:用來判斷是否需要發(fā)送TRUNC或DIFF消息給發(fā)送隊列,默認(rèn)為true
        if (needOpPacket && !needSnap) {
            // This should never happen, but we should fall back to sending
            // snapshot just in case.
            LOG.error("Unhandled scenario for peer sid: " +  getSid() +
                     " fall back to use snapshot");
            needSnap = true;
        }
        return needSnap;
    }

可以看出同步方式可大致分為5種:
1.強制快照同步
可設(shè)置forceSnapSync為true炫彩,用于測試使用匾七,默認(rèn)為false
2.不需要同步
此時主從最大zxid一致,不需要同步媒楼,僅需要發(fā)送一個DIFF消息即可
3.回滾同步
learner服務(wù)器zxid peerLastZxid大于leader服務(wù)器zxid lastProcessedZxid乐尊,并且peerLastZxid>0,此時需要從服務(wù)器丟棄大于lastProcessedZxid的事務(wù)日志,會發(fā)送TRUNC消息給learner服務(wù)器queueOpPacket(Leader.TRUNC, maxCommittedLog);
4.差異化同步(TRUNC+DIFF同步)

  • peerLastZxid位于minCommittedLogmaxCommittedLog之間划址,但peerLastZxid找不到這個范圍內(nèi)的值扔嵌,則先回滾到離peerLastZxid最近的前一條消息prevProposalZxid限府,然后再進行(prevProposalZxid, maxZxid]之間的zxid同步

  • peerLastZxid位于minCommittedLogmaxCommittedLog之間,且peerLastZxid真實存在痢缎,則只需要進行(peerLaxtZxid, maxZxid]之間的zxid同步胁勺,與上面一條的差別處理可見LearnerHanler.queueCommittedProposals

protected long queueCommittedProposals(Iterator<Proposal> itr,
            long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        long queuedZxid = peerLastZxid;
        // as we look through proposals, this variable keeps track of previous
        // proposal Id.
        long prevProposalZxid = -1;
        while (itr.hasNext()) {
            Proposal propose = itr.next();

            long packetZxid = propose.packet.getZxid();
            // abort if we hit the limit
            if ((maxZxid != null) && (packetZxid > maxZxid)) {
                break;
            }

            // skip the proposals the peer already has
            if (packetZxid < peerLastZxid) {
                prevProposalZxid = packetZxid;
                continue;
            }

            // If we are sending the first packet, figure out whether to trunc
            // or diff
            if (needOpPacket) {

                // Send diff when we see the follower's zxid in our history,情況5-1
                if (packetZxid == peerLastZxid) {
                    LOG.info("Sending DIFF zxid=0x" +
                             Long.toHexString(lastCommittedZxid) +
                             " for peer sid: " + getSid());
                    queueOpPacket(Leader.DIFF, lastCommittedZxid);
                    needOpPacket = false;
                    continue;
                }

                if (isPeerNewEpochZxid) {
                   // Send diff and fall through if zxid is of a new-epoch
                   LOG.info("Sending DIFF zxid=0x" +
                            Long.toHexString(lastCommittedZxid) +
                            " for peer sid: " + getSid());
                   queueOpPacket(Leader.DIFF, lastCommittedZxid);
                   needOpPacket = false;
                } else if (packetZxid > peerLastZxid  ) {
                    // Peer have some proposals that the leader hasn't seen yet,情況4
                    // it may used to be a leader
                    if (ZxidUtils.getEpochFromZxid(packetZxid) !=
                            ZxidUtils.getEpochFromZxid(peerLastZxid)) {
                        // We cannot send TRUNC that cross epoch boundary.
                        // The learner will crash if it is asked to do so.
                        // We will send snapshot this those cases.
                        LOG.warn("Cannot send TRUNC to peer sid: " + getSid() +
                                 " peer zxid is from different epoch" );
                        return queuedZxid;
                    }

                    LOG.info("Sending TRUNC zxid=0x" +
                            Long.toHexString(prevProposalZxid) +
                            " for peer sid: " + getSid());
                    queueOpPacket(Leader.TRUNC, prevProposalZxid);
                    needOpPacket = false;
                }
            }

            if (packetZxid <= queuedZxid) {
                // We can get here, if we don't have op packet to queue
                // or there is a duplicate txn in a given iterator
                continue;
            }

            // Since this is already a committed proposal, we need to follow
            // it by a commit packet
//發(fā)送PROPOSAL消息独旷,包含數(shù)據(jù)信息
            queuePacket(propose.packet);
//發(fā)送COMMIT消息署穗,僅包含需要提交的zxid信息
            queueOpPacket(Leader.COMMIT, packetZxid);
            queuedZxid = packetZxid;

        }

        if (needOpPacket && isPeerNewEpochZxid) {
            // We will send DIFF for this kind of zxid in any case. This if-block
            // is the catch when our history older than learner and there is
            // no new txn since then. So we need an empty diff
            LOG.info("Sending DIFF zxid=0x" +
                     Long.toHexString(lastCommittedZxid) +
                     " for peer sid: " + getSid());
            queueOpPacket(Leader.DIFF, lastCommittedZxid);
            needOpPacket = false;
        }

        return queuedZxid;
    }
  • 如果peerLastZxid < minCommittedLog,但是所處事務(wù)日志文件txnLog位置之后的事務(wù)大小小于最近快照中后snapSize * snapshotSizeFactor的大小,則采用txnLog + committedLog的方式同步嵌洼,分為兩部分:
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                                         minCommittedLog, maxCommittedLog);

                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                                         null, maxCommittedLog);

5.全量同步
如果peerLastZxid小于以上情況案疲,則進行全量同步,該方法返回true麻养,回到LearnerHandler.run,會發(fā)送SNAP消息褐啡,并將整個ZKDatabase序列化,發(fā)送出去
之后會開啟線程異步發(fā)送queuedPackets隊列消息鳖昌,等待learner服務(wù)器的同步完成ACK消息备畦。

Learner數(shù)據(jù)同步接收過程

當(dāng)Learner服務(wù)器發(fā)送完ACKEPOCH消息后,便會進入同步過程Learner.syncWithLeader(Follewer/Observer都會調(diào)用此方法)

protected void syncWithLeader(long newLeaderZxid) throws Exception{
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
        
        QuorumVerifier newLeaderQV = null;
        
        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        boolean syncSnapshot = false;
        readPacket(qp);
        LinkedList<Long> packetsCommitted = new LinkedList<Long>();
        LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                snapshotNeeded = false;
            }
            else if (qp.getType() == Leader.SNAP) {
                LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                // The leader is going to dump the database
                // db is clear as part of deserializeSnapshot()
                zk.getZKDatabase().deserializeSnapshot(leaderIs);
                // ZOOKEEPER-2819: overwrite config node content extracted
                // from leader snapshot with local config, to avoid potential
                // inconsistency of config node content during rolling restart.
                if (!QuorumPeerConfig.isReconfigEnabled()) {
                    LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                    zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                }
                String signature = leaderIs.readString("signature");
                if (!signature.equals("BenWasHere")) {
                    LOG.error("Missing signature. Got " + signature);
                    throw new IOException("Missing signature");                   
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

                // immediately persist the latest snapshot when there is txn log gap
                syncSnapshot = true;
            } else if (qp.getType() == Leader.TRUNC) {
                //we need to truncate the log to the lastzxid of the leader
                LOG.warn("Truncating log to get in sync with the leader 0x"
                        + Long.toHexString(qp.getZxid()));
                boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                if (!truncated) {
                    // not able to truncate the log
                    LOG.error("Not able to truncate the log "
                            + Long.toHexString(qp.getZxid()));
                    System.exit(13);
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

            }
            else {
                LOG.error("Got unexpected packet from leader: {}, exiting ... ",
                          LearnerHandler.packetToString(qp));
                System.exit(13);

            }
            zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
            zk.createSessionTracker();            
            
            long lastQueued = 0;

            // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
            // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
            // we need to make sure that we don't take the snapshot twice.
            boolean isPreZAB1_0 = true;
            //If we are not going to take the snapshot be sure the transactions are not applied in memory
            // but written out to the transaction log
            boolean writeToTxnLog = !snapshotNeeded;
            // we are now going to start getting transactions to apply followed by an UPTODATE
            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    PacketInFlight pif = new PacketInFlight();
                    pif.hdr = new TxnHeader();
                    pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                    if (pif.hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(pif.hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = pif.hdr.getZxid();
                    
                    if (pif.hdr.getType() == OpCode.reconfig){                
                        SetDataTxn setDataTxn = (SetDataTxn) pif.rec;       
                       QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
                       self.setLastSeenQuorumVerifier(qv, true);                               
                    }
                    
                    packetsNotCommitted.add(pif);
                    break;
                case Leader.COMMIT:
                case Leader.COMMITANDACTIVATE:
                    pif = packetsNotCommitted.peekFirst();
                    if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
                        boolean majorChange = self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),
                                qp.getZxid(), true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    }
                    if (!writeToTxnLog) {
                        if (pif.hdr.getZxid() != qp.getZxid()) {
                            LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                        } else {
                            zk.processTxn(pif.hdr, pif.rec);
                            packetsNotCommitted.remove();
                        }
                    } else {
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.INFORM:
                case Leader.INFORMANDACTIVATE:
                    PacketInFlight packet = new PacketInFlight();
                    packet.hdr = new TxnHeader();

                    if (qp.getType() == Leader.INFORMANDACTIVATE) {
                        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
                        long suggestedLeaderId = buffer.getLong();
                        byte[] remainingdata = new byte[buffer.remaining()];
                        buffer.get(remainingdata);
                        packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)packet.rec).getData()));
                        boolean majorChange =
                                self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    } else {
                        packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                        // Log warning message if txn comes out-of-order
                        if (packet.hdr.getZxid() != lastQueued + 1) {
                            LOG.warn("Got zxid 0x"
                                    + Long.toHexString(packet.hdr.getZxid())
                                    + " expected 0x"
                                    + Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = packet.hdr.getZxid();
                    }
                    if (!writeToTxnLog) {
                        // Apply to db directly if we haven't taken the snapshot
                        zk.processTxn(packet.hdr, packet.rec);
                    } else {
                        packetsNotCommitted.add(packet);
                        packetsCommitted.add(qp.getZxid());
                    }

                    break;                
                case Leader.UPTODATE:
                    LOG.info("Learner received UPTODATE message");                                      
                    if (newLeaderQV!=null) {
                       boolean majorChange =
                           self.processReconfig(newLeaderQV, null, null, true);
                       if (majorChange) {
                           throw new Exception("changes proposed in reconfig");
                       }
                    }
                    if (isPreZAB1_0) {
                        zk.takeSnapshot(syncSnapshot);
                        self.setCurrentEpoch(newEpoch);
                    }
                    self.setZooKeeperServer(zk);
                    self.adminServer.setZooKeeperServer(zk);
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                    // means this is Zab 1.0
                   LOG.info("Learner received NEWLEADER message");
                   if (qp.getData()!=null && qp.getData().length > 1) {
                       try {                       
                           QuorumVerifier qv = self.configFromString(new String(qp.getData()));
                           self.setLastSeenQuorumVerifier(qv, true);
                           newLeaderQV = qv;
                       } catch (Exception e) {
                           e.printStackTrace();
                       }
                   }

                   if (snapshotNeeded) {
                       zk.takeSnapshot(syncSnapshot);
                   }
                   
                    self.setCurrentEpoch(newEpoch);
                    writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                    isPreZAB1_0 = false;
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
        ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        sock.setSoTimeout(self.tickTime * self.syncLimit);
        zk.startup();
        /*
         * Update the election vote here to ensure that all members of the
         * ensemble report the same vote to new servers that start up and
         * send leader election notifications to the ensemble.
         * 
         * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
         */
        self.updateElectionVote(newEpoch);

        // We need to log the stuff that came in between the snapshot and the uptodate
        if (zk instanceof FollowerZooKeeperServer) {
            FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
            for(PacketInFlight p: packetsNotCommitted) {
                fzk.logRequest(p.hdr, p.rec);
            }
            for(Long zxid: packetsCommitted) {
                fzk.commit(zxid);
            }
        } else if (zk instanceof ObserverZooKeeperServer) {
            // Similar to follower, we need to log requests between the snapshot
            // and UPTODATE
            ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
            for (PacketInFlight p : packetsNotCommitted) {
                Long zxid = packetsCommitted.peekFirst();
                if (p.hdr.getZxid() != zxid) {
                    // log warning message if there is no matching commit
                    // old leader send outstanding proposal to observer
                    LOG.warn("Committing " + Long.toHexString(zxid)
                            + ", but next proposal is "
                            + Long.toHexString(p.hdr.getZxid()));
                    continue;
                }
                packetsCommitted.remove();
                Request request = new Request(null, p.hdr.getClientId(),
                        p.hdr.getCxid(), p.hdr.getType(), null, null);
                request.setTxn(p.rec);
                request.setHdr(p.hdr);
                ozk.commitRequest(request);
            }
        } else {
            // New server type need to handle in-flight packets
            throw new UnsupportedOperationException("Unknown server type");
        }
    }

大致流程為:首先會判斷第一個接收到的消息類型是DIFF许昨,SNAP還是TRUNC懂盐,分別進行不同的數(shù)據(jù)同步準(zhǔn)備。然后開始不斷讀取同步消息糕档,直到接收到NEWLEADER消息后莉恼,發(fā)送ACK給leader服務(wù)器,等待leader服務(wù)器的UPTODATE消息速那,表示同步完成类垫,然后再發(fā)送ACK給leader服務(wù)器,表示learner服務(wù)器也知道了琅坡,開始啟動zkServer,對外提供服務(wù)。

感謝您的閱讀残家,我是Monica23334 || Monica2333 榆俺。立下每周寫一篇原創(chuàng)文章flag的小姐姐,關(guān)注我并期待打臉吧~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末坞淮,一起剝皮案震驚了整個濱河市茴晋,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌回窘,老刑警劉巖诺擅,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異啡直,居然都是意外死亡烁涌,警方通過查閱死者的電腦和手機苍碟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來撮执,“玉大人微峰,你說我怎么就攤上這事∈闱” “怎么了蜓肆?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長谋币。 經(jīng)常有香客問我仗扬,道長,這世上最難降的妖魔是什么蕾额? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任早芭,我火速辦了婚禮,結(jié)果婚禮上凡简,老公的妹妹穿的比我還像新娘逼友。我一直安慰自己,他們只是感情好秤涩,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布帜乞。 她就那樣靜靜地躺著,像睡著了一般筐眷。 火紅的嫁衣襯著肌膚如雪黎烈。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天匀谣,我揣著相機與錄音照棋,去河邊找鬼。 笑死武翎,一個胖子當(dāng)著我的面吹牛烈炭,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播宝恶,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼符隙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了垫毙?” 一聲冷哼從身側(cè)響起霹疫,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎综芥,沒想到半個月后丽蝎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡膀藐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年屠阻,在試婚紗的時候發(fā)現(xiàn)自己被綠了红省。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡栏笆,死狀恐怖类腮,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蛉加,我是刑警寧澤蚜枢,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站针饥,受9級特大地震影響厂抽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜丁眼,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一筷凤、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧苞七,春花似錦藐守、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至惠啄,卻和暖如春慎恒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背撵渡。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工融柬, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人趋距。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓粒氧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親节腐。 傳聞我的和親對象是個殘疾皇子靠欢,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容