zk 數(shù)據(jù)和存儲

數(shù)據(jù)分類

整體分為 3 類:

  • 內(nèi)存數(shù)據(jù)

  • 磁盤數(shù)據(jù): 磁盤數(shù)據(jù)又分為

       1. 快照
       2. 事務(wù)日志
    

在zk啟動過程中抑堡,3類數(shù)據(jù)之間的關(guān)系為:


image.png

內(nèi)存數(shù)據(jù)

ZK 的數(shù)據(jù)模型:樹
樹中單個節(jié)點(diǎn)包含的內(nèi)容:

節(jié)點(diǎn)數(shù)據(jù)
節(jié)點(diǎn) ACL 信息
節(jié)點(diǎn)的路徑

在Zookeeper中,數(shù)據(jù)存儲分為兩部分:內(nèi)存數(shù)據(jù)存儲和磁盤數(shù)據(jù)存儲,我們主要分析服務(wù)器啟動時內(nèi)存數(shù)據(jù)庫的初始化過程和主從服務(wù)器數(shù)據(jù)同步的過程,先介紹一下涉及的基本類
DataTree
Zookeeper的數(shù)據(jù)模型是一棵樹簇爆,DataTree是內(nèi)存數(shù)據(jù)存儲的核心在孝,代表了內(nèi)存中一份完整的數(shù)據(jù)(最新),包括所有的節(jié)點(diǎn)路徑浆竭,節(jié)點(diǎn)數(shù)據(jù)和ACL信息,對應(yīng)watches等。類的主要屬性為:

 //節(jié)點(diǎn)路徑為key,節(jié)點(diǎn)數(shù)據(jù)內(nèi)容DataNode為value.實時存儲了所有的zk節(jié)點(diǎn)邦泄,使用ConcurrentHashMap保證并發(fā)性
    private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();

//節(jié)點(diǎn)數(shù)據(jù)對應(yīng)的watch
    private final WatchManager dataWatches = new WatchManager();

//節(jié)點(diǎn)路徑對應(yīng)的watch
    private final WatchManager childWatches = new WatchManager();

//key為sessionId,value為該會話對應(yīng)的臨時節(jié)點(diǎn)路徑删窒,方便實時訪問和清理
    private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();

//This set contains the paths of all container nodes
    private final Set<String> containers =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

//This set contains the paths of all ttl nodes
    private final Set<String> ttls =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//內(nèi)存數(shù)據(jù)庫的最大zxid
public volatile long lastProcessedZxid = 0;

DataNode
數(shù)據(jù)存儲的最小單元,包含節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容顺囊,節(jié)點(diǎn)狀態(tài)肌索,子節(jié)點(diǎn)列表,以及對子節(jié)點(diǎn)的操作接口等特碳,主要屬性為:

//節(jié)點(diǎn)內(nèi)容
    byte data[];
    Long acl;
    //節(jié)點(diǎn)狀態(tài)诚亚,包括一些節(jié)點(diǎn)的元數(shù)據(jù),如ephemeralOwner午乓,czxid等
    public StatPersisted stat;
//子節(jié)點(diǎn)相對父節(jié)點(diǎn)路徑集合站宗,不包括父節(jié)點(diǎn)路徑
    private Set<String> children = null;

image.png

拋出 2 個問題:

1. DataTree 中 nodes 是 Map,表示所有的 ZK 節(jié)點(diǎn)益愈,那其內(nèi)部 key 是什么梢灭?
Re:ZNode 的唯一標(biāo)識 path 作為 key
2. ephemerals 是Map,用于存儲臨時節(jié)點(diǎn)腕唧,那其內(nèi)部 key 是什么或辖?value 又是什么?
Re:臨時節(jié)點(diǎn)是跟 Session 綁定的枣接,sessionId 作為 key

ZKDatabase
Zookeeper的內(nèi)存數(shù)據(jù)庫颂暇,負(fù)責(zé)管理Zookeeper的所有會話,DataTree存儲和事務(wù)日志但惶。它會定時向磁盤dump快照數(shù)據(jù)(snapCount主要控制)耳鸯,服務(wù)器啟動時,會通過磁盤上的事務(wù)日志和快照數(shù)據(jù)文件恢復(fù)成完整的內(nèi)存數(shù)據(jù)庫膀曾。主要屬性為:

    protected DataTree dataTree;
//key為sessionId,value為會話過期時間
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//用于和磁盤交互事務(wù)日志文件和快照文件的類
    protected FileTxnSnapLog snapLog;
//主從數(shù)據(jù)同步時使用
    protected long minCommittedLog, maxCommittedLog;
    public static final int commitLogCount = 500;
    protected static int commitLogBuffer = 700;
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();

zookeeper內(nèi)存數(shù)據(jù)庫的兩種持久化方式:
FileTxnLog:事務(wù)日志文件县爬,以日志追加的方式維護(hù)。
FileSnap:內(nèi)存快照文件添谊,保存內(nèi)存數(shù)據(jù)庫某一時刻的狀態(tài)财喳,所以數(shù)據(jù)不一定是最新的。

 TxnLog:是一個讀取日志的接口斩狱,提供了讀取事務(wù)log的接口方法耳高。
  SnapShot:是一個操作日志快照的接口,提供了對快照文件操作的方法所踊。
  FileTxnLog:實現(xiàn)TxnLog接口泌枪,提供了讀取事務(wù)日志的方法實現(xiàn)。
  FileSnap:實現(xiàn)Snapshot接口秕岛,負(fù)責(zé)存儲碌燕、序列化误证、反序列化、訪問快照修壕。
  FileTxnSnapLog愈捅,封裝了TxnLog和SnapShot。
  Util叠殷,工具類改鲫,提供持久化所需的API。

我們可以看到林束,zookeeper總共提供了兩種持久化文件,分別是內(nèi)存快照SnapShot和事務(wù)日志TxnLog(這種日志有點(diǎn)類似MySQL數(shù)據(jù)庫中的binlog稽亏,zookeeper會把所有涉及到修改內(nèi)存數(shù)據(jù)結(jié)構(gòu)的操作日志記錄到該log中壶冒,也就是說,zookeeper會把每一個事務(wù)操作諸如添加截歉、刪除都會記錄到這個日志文件中胖腾,當(dāng)zookeeper出現(xiàn)異常時,可以借助該事務(wù)日志進(jìn)行數(shù)據(jù)恢復(fù))瘪松。
日志文件它主要是負(fù)責(zé)實時記錄對服務(wù)端的每一次的事務(wù)操作日志(這里講的事務(wù)和數(shù)據(jù)庫中事務(wù)不一樣咸作,它是指涉及到對服務(wù)器端的內(nèi)存數(shù)據(jù)庫的增刪改這種會變更內(nèi)存數(shù)據(jù)庫的操作行為,不記錄查詢)
通過這種日志文件宵睦,當(dāng)zookeeper因為故障而發(fā)生重啟時记罚,我們就可以根據(jù)內(nèi)存快照文件和事務(wù)日志使得內(nèi)存數(shù)據(jù)庫恢復(fù)最新的數(shù)據(jù)庫狀態(tài)。在zookeeper中壳嚎,F(xiàn)ileTxnLog就是負(fù)責(zé)日志文件持久化的邏輯對象桐智,它是TxnLog的一個實現(xiàn)類。它會通過在分配內(nèi)存時會預(yù)分配固定大的內(nèi)存大醒滔凇说庭;同時保證每次寫的時候都是直接追加順序?qū)懭耄瑥亩WC日志文件的性能郑趁。
首先我們來看下TxnLog它給我們哪些方法:

  void rollLog() throws IOException;// 滾動日志刊驴,從當(dāng)前日志滾到下一個日志,不是回滾
  boolean append(TxnHeader hdr, Record r) throws IOException;//追加一個請求至事務(wù)性日志
  TxnIterator read(long zxid) throws IOException;// 可迭代讀取事務(wù)性日志
  long getLastLoggedZxid() throws IOException;//事務(wù)性操作的最新zxid
  boolean truncate(long zxid) throws IOException;// 清空zxid以后的日志
  long getDbId() throws IOException;// 獲取數(shù)據(jù)庫的id
  void commit() throws IOException;// 提交事務(wù)并進(jìn)行確認(rèn)
  void close() throws IOException;// 關(guān)閉事務(wù)性日志

FileTxnLog是TxnLog的一個實現(xiàn)類寡润,所以它也就負(fù)責(zé)了實現(xiàn)該接口上的方法捆憎,我們來看下它是怎么實現(xiàn)的,對于一個日志文件悦穿,特別要關(guān)注的是它的讀寫操作的性能攻礼。

FileTxnLog
實現(xiàn)了TxnLog接口,提供了API可以獲取日志和寫入日志栗柒,
首先先看一下事務(wù)日志文件的格式

LogFile:
//一個日志文件由以下三個部分組成
 *     FileHeader TxnList ZeroPad
//1.文件頭
 * FileHeader: {
 *     magic 4bytes (ZKLG)   
 *     version 4bytes
 *     dbid 8bytes
 *   }
 //事務(wù)內(nèi)容
 * TxnList:
 *     Txn || Txn TxnList

 * Txn:
//一條事務(wù)日志的組成部分
 *     checksum Txnlen TxnHeader Record 0x42

 * checksum: 8bytes Adler32 is currently used
 *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
 *
 * Txnlen:
 *     len 4bytes
 *
 * TxnHeader: {
 *     sessionid 8bytes
 *     cxid 4bytes
 *     zxid 8bytes
 *     time 8bytes
 *     type 4bytes
 *   }
 *
 * Record:
 *     See Jute definition file for details on the various record types
 *
 * ZeroPad:
 *     0 padded to EOF (filled during preallocation stage)
public class FileTxnLog implements TxnLog {
    private static final Logger LOG;
    //預(yù)分配64m大小
    static long preAllocSize =  65536 * 1024;
    //直接內(nèi)存
    private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
    //魔數(shù)礁扮,用于校驗日志文件的正確性知举,默認(rèn)為1514884167
    public final static int TXNLOG_MAGIC =ByteBuffer.wrap("ZKLG".getBytes()).getInt();
    public final static int VERSION = 2;
    //日志文件名好的前綴
    public static final String LOG_FILE_PREFIX = "log";
    /** Maximum time we allow for elapsed fsync before WARNing */
    private final static long fsyncWarningThresholdMS;
    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);
        //獲得系統(tǒng)參數(shù),判斷系統(tǒng)參數(shù)配置了預(yù)分配內(nèi)存大小
        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
        /** Local variable to read fsync.warningthresholdms into */
        Long fsyncWarningThreshold;
        if ((fsyncWarningThreshold = Long.getLong("zookeeper.fsync.warningthresholdms")) == null)
            fsyncWarningThreshold = Long.getLong("fsync.warningthresholdms", 1000);
        fsyncWarningThresholdMS = fsyncWarningThreshold;
    }
    // 最大(也就是最新)的zxid
    long lastZxidSeen;
    volatile BufferedOutputStream logStream = null;
    volatile OutputArchive oa;
    volatile FileOutputStream fos = null;
    //log目錄文件
    File logDir;
    //是否強(qiáng)制同步太伊,默認(rèn)是yes
    private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");
    long dbId;
    private LinkedList<FileOutputStream> streamsToFlush =
        new LinkedList<FileOutputStream>();
    // 當(dāng)前配置的大小
    long currentSize;
    // 寫日志文件
    File logFileWrite = null;

    private volatile long syncElapsedMS = -1L;
}

FileTxnLog方法

append: 主要是負(fù)責(zé)日志追加雇锡,在對日志文件的寫操作時,zookeeper主要是通過日志追加的方法

public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        //校驗頭部不能為空,hdr主要包含了czxid僚焦、clientId锰提、zxid等相關(guān)信息
        if (hdr == null) {
            return false;
        }
        //如果待寫入的事務(wù)的事務(wù)id小于本地保存的最新的事務(wù)id,給提醒
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        //在第一次新建一個FileTxnLog時候芳悲,logStream還是空的立肘,這個時候需要為它創(chuàng)建一個新的日志文件,并把logStream指向這個日志文件
        if (logStream==null) {
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }
           //根據(jù)待寫入的事務(wù)id創(chuàng)建一個新的日志文件名扛,我們可以看到文件名包含這個文件存放的事務(wù)的最小事務(wù)id
           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           //根據(jù)魔數(shù)谅年、版本號和數(shù)據(jù)庫id生成日志文件頭,dbId默認(rèn)是0
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");
           // 確保在用0填充之前,先把魔數(shù)信息等寫入到文件中肮韧,進(jìn)行依次flush
           logStream.flush();
           currentSize = fos.getChannel().position();//獲取當(dāng)前文件流中的大小
           streamsToFlush.add(fos);
        }
        //重新計算文件大小融蹂,保證文件的大小是預(yù)分配大小的整數(shù)倍
        //可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷弄企;
        //迅速占用磁盤空間超燃,防止使用過程中所需空間不足
        currentSize = padFile(fos.getChannel());
        //序列化TxnHeader Record記錄到byte[]
        byte[] buf = Util.marshallTxnEntry(hdr, txn);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
        //創(chuàng)建校驗和的算法,默認(rèn)是Adler32
        //Adler-32可用于計算數(shù)據(jù)流的校驗和 校驗和拘领,幾乎與 CRC-32 一樣可靠意乓,但是能夠更快地計算出來
        Checksum crc = makeChecksumAlgorithm();
        //使用指定的字節(jié)數(shù)組更新校驗和
        crc.update(buf, 0, buf.length);
      //將更新的校驗和值寫入到日志文件中
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        //將TxnHeader Record數(shù)據(jù)寫入到輸出流
        Util.writeTxnBytes(oa, buf);
        return true;
    }
   //1.先計算buf數(shù)據(jù)長度寫入
        //2.寫入buf數(shù)組數(shù)據(jù)
        // 3.記錄尾部以’B’字符結(jié)尾,寫入0x42
     public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
            throws IOException {
        oa.writeBuffer(bytes, "txnEntry");
        oa.writeByte((byte) 0x42, "EOR"); // 'B'
    }

我們再看看下它是怎么計算和填充為文件分配的大小

 private long padFile(FileChannel fileChannel) throws IOException {
        //計算新的文件大小院究,并通過填充0先占用未使用的byte空間洽瞬,
        //這樣可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷
        //  currentSize默認(rèn)是0
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        if (currentSize != newFileSize) {//將整個日志文件中未使用的部分填充0
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }

/***
     *
     * @param position 通過管道寫入的字節(jié)長度
     * @param fileSize 當(dāng)前設(shè)置的文件大小
     * @param preAllocSize 與分配大小
     * @return
     */
    public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
        // I如果剩余空間不足4k且預(yù)分配空間大于0
        if (preAllocSize > 0 && position + 4096 >= fileSize) {
            //如果已寫入的長度超過了文件大小业汰,文件大小擴(kuò)為 寫入的字節(jié)長度+預(yù)分配的大小
            if (position > fileSize){//剛創(chuàng)建的時候肯定走這個伙窃,這樣就可以保證fileSize始終是preAllocSize的整數(shù)倍
                fileSize = position + preAllocSize;
                //這邊會重新調(diào)整到預(yù)分配塊長度的整數(shù)倍(是否是為了方便管理統(tǒng)計等?)
                fileSize -= fileSize % preAllocSize;
            } else {
                fileSize += preAllocSize;
            }
        }

        return fileSize;
    }

現(xiàn)在我們對FileTxnLog文件的寫應(yīng)該有一定的了解样漆。也知道为障,在文件新建的時候會預(yù)分配文件內(nèi)存大小,并用0來填充放祟,從而保證文件的磁盤占用是連續(xù)的鳍怨,同時通過日志追加的方式,我們可以保證對日志文件的寫的順序性跪妥,從而保證了寫性能鞋喇;我們也可以到,每次將事務(wù)寫入到日志文件中時眉撵,都會先根據(jù)寫入的事務(wù)計算并寫入一個校驗和侦香,然后再把事務(wù)流寫入到日志文件中落塑,這樣可以充分保證事務(wù)日志的安全性和完整性。

read:看完寫文件操作罐韩,我們當(dāng)然想知道讀文件的操作憾赁。因為讀寫是一一對應(yīng)的。文件的讀取散吵,zookeepeer給我們提供了兩種重載的方法:

/***
   * zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
    * 默認(rèn)fastForward=true
   */
  TxnIterator read(long zxid)
   /***
   * zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
   * fastForward:
   *        true:則返回的迭代器只包含大于等于zxid的事務(wù)
   *        fasle:則返回的迭代器除了包含大于等于zxid的事務(wù)龙考,還包含了跟ZXID同一個日志文件的ZXID
   */
  TxnIterator read(long zxid, boolean fastForward)
 public TxnIterator read(long zxid, boolean fastForward) throws IOException {
        return new FileTxnIterator(logDir, zxid, fastForward);
    }
 /***
     * 讀取事務(wù)日志, 這個方法在服務(wù)當(dāng)機(jī)恢復(fù)的時候矾睦,用來遍歷事務(wù)日志來恢復(fù)數(shù)據(jù)
     * 根據(jù)目標(biāo)事務(wù)zxid晦款,從日志文件中讀取大于該事務(wù)id的事務(wù),并返回這些事務(wù)構(gòu)成的迭代器TxnIterator
     * 注意底層在遍歷每一個日志文件的時候顷锰,會對文件進(jìn)行魔數(shù)校驗等柬赐,避免文件被損壞
     * @param zxid 迭代器的開始位置
     * @return
     * @throws IOException
     */
   public FileTxnIterator(File logDir, long zxid, boolean fastForward)
                throws IOException {
            this.logDir = logDir;//日志文件存放目錄
            this.zxid = zxid;//目標(biāo)事務(wù)ID 
            init();
            //在init()方法里,我們拿到目標(biāo)文件的第一個事務(wù)ID官紫,這個時候如果fastForward 是true的話,就要繼續(xù)往下遍歷州藕,找出目標(biāo)zxid的事務(wù)束世,才進(jìn)行停止。
            //這里要注意hdr是上一次遍歷的事務(wù)頭
            if (fastForward && hdr != null) {
                while (hdr.getZxid() < zxid) {
                    if (!next())
                        break;
                }
            }
        }
  
    void init() throws IOException {
            storedFiles = new ArrayList<File>();
          //排序目錄下的日志文件床玻,我們知道文件名稱是根據(jù)事務(wù)id來創(chuàng)建的毁涉,所以,文件的排序也等價于事務(wù)的排序
            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
            for (File f: files) {
                //找出起始事務(wù)ID大于ZXID的日志文件
                if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
                    storedFiles.add(f);
                }
                //當(dāng)?shù)谝淮伪闅v到起始ID小于ZXID的日志文件后锈死,要記得把該文件也作為查找目標(biāo)文件贫堰,因為它里面可能包含大于ZXID的事務(wù)。
                //同時停止遍歷待牵,因為后面繼續(xù)遍歷下去也沒意思其屏,都是小于ZXID的日志文件。
                else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
                    storedFiles.add(f);
                    break;
                }
            }
          //找出已排好序且可能存在大于ZXID的日志文件后缨该,打開第一個日志文件輸入流準(zhǔn)備讀取
            goToNextLog();
            //注意這個時候調(diào)用next()只是獲取第一個日志文件中的第一個事務(wù)ID偎行,該事務(wù)ID并不一定是ZXID。
            next();
        }
    //開始讀取下一個事務(wù)
     public boolean next() throws IOException {
            if (ia == null) {
                return false;
            }
            try {
                //讀取校驗和的值
                long crcValue = ia.readLong("crcvalue");
                //讀取事務(wù)
                byte[] bytes = Util.readTxnBytes(ia);
                //因為我們是采用預(yù)分配內(nèi)存方式贰拿,會定義一個EOF作為空的事務(wù)蛤袒。所以,當(dāng)我們讀取到一個空的膨更,也就表明日志文件已讀到末尾
                if (bytes == null || bytes.length==0) {
                    throw new EOFException("Failed to read " + logFile);
                }
                //分析校驗和的值是否正確妙真,防止消息被破壞。這就是為什么我們在append的時候要加入校驗和
                Checksum crc = makeChecksumAlgorithm();
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue())
                    throw new IOException(CRC_ERROR);
                //反序列事務(wù)
                hdr = new TxnHeader();
                record = SerializeUtils.deserializeTxn(bytes, hdr);
            } catch (EOFException e) {
                LOG.debug("EOF exception " + e);
                inputStream.close();
                inputStream = null;
                ia = null;
                hdr = null;
                //日志文件已讀到末尾了荚守,所以要跳到下一個文件開始讀取
                if (!goToNextLog()) {
                    return false;
                }
                // if we went to the next log file, we should call next() again
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            return true;
        }

FileTxnLog除了提供對事務(wù)日志的讀寫之外珍德,還提供了其它的一些額外方法练般,下面我們繼續(xù)看些這些方法
getLogFiles:獲取可能包含比目標(biāo)事務(wù)ID大的日志文件的數(shù)組(服務(wù)器啟動并恢復(fù)內(nèi)存數(shù)據(jù)庫的時候會調(diào)用這個方法進(jìn)行內(nèi)存數(shù)據(jù)庫恢復(fù))

/***
     *
     * @param logDirList 日志文件列表
     * @param snapshotZxid  通過內(nèi)存快照恢復(fù)的最大的事務(wù)id,剩余的比snapshotZxid就要從日志文件里恢復(fù)
     * @return 找出比包含有<=snapshotZxid的事務(wù)id的日志文件列表,當(dāng)snapshotZxid=0時菱阵,獲取所有的文件
     */
    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        //對日志文件進(jìn)行排序踢俄,按照事務(wù)ID從高到低
        List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
        long logZxid = 0;
        // Find the log file that starts before or at the same time as the
        // zxid of the snapshot
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            //如果文件名的事務(wù)id:fzxid>快照的最新的zxid
            if (fzxid > snapshotZxid) {
                continue;
            }
            //如果fzxid <= snapshotZxid && fzxid > logZxid
            if (fzxid > logZxid) {
                logZxid = fzxid;
            }
        }
        List<File> v=new ArrayList<File>(5);
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            if (fzxid < logZxid) {//找出文件id大于logZxid的文件名
                continue;
            }
            v.add(f);
        }
        return v.toArray(new File[0]);

    }

getLastLoggedZxid獲取最新的事務(wù)ID

   //該方法只在節(jié)點(diǎn)服務(wù)器啟動的時候被調(diào)用
    public long getLastLoggedZxid() {//從日志文件中獲取最大的Zxid
        //找出所有的日志文件并排序(其實可以排序后拿第一個就好了啊晴及?)
        File[] files = getLogFiles(logDir.listFiles(), 0);
        //排序日志文件都办,并從日志文件名稱上獲取包含最大zxid的日志文件的文件名中的日志id
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;

        // 在最新的日志文件里迭代查找最新的事務(wù)ID
        long zxid = maxLog;
        TxnIterator itr = null;
        try {
            FileTxnLog txn = new FileTxnLog(logDir);
            //根據(jù)文件名的事務(wù)id遍歷迭代該日志文件,獲取整個內(nèi)存數(shù)據(jù)庫的最大事務(wù)id,
            itr = txn.read(maxLog);
            while (true) {
                if(!itr.next())
                    break;
                TxnHeader hdr = itr.getHeader();
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            close(itr);
        }
        return zxid;
    }

commit方法虑稼,提交日志并刷至磁盤琳钉,force方法會把所有未寫磁盤的數(shù)據(jù)都強(qiáng)制寫入磁盤。 這是因為在操作系統(tǒng)中出于性能考慮回把數(shù)據(jù)放入緩沖區(qū)蛛倦,所以不能保證數(shù)據(jù)在調(diào)用write寫入文件通道后就及時寫到磁盤上了歌懒,除非手動調(diào)用force方法。 force方法需要一個布爾參數(shù)溯壶,代表是否把meta data也一并強(qiáng)制寫入及皂。

/***
     * 提交事務(wù),確保日志刷新到磁盤中
     * @throws IOException
     */
    public synchronized void commit() throws IOException {
        if (logStream != null) {//刷新
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {//是否強(qiáng)制刷盤
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();
        }
    }

truncate:截斷刪除比zxid大的事務(wù)
runcate清空大于給定的zxid事務(wù)日志且改,集群版learner向leader同步的時候验烧,leader告訴learner需要回滾同步調(diào)用Learner#syncWithLeader

public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            //找出大于zxid的事務(wù)迭代器
            itr = new FileTxnIterator(this.logDir, zxid);
            PositionInputStream input = itr.inputStream;
            if(input == null) {
                throw new IOException("No log files found to truncate! This could " +
                        "happen if you still have snapshots from an old setup or " +
                        "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
            }
            long pos = input.getPosition();
            RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
            raf.setLength(pos);//把當(dāng)前l(fā)og后面的部分(比zxid更大的)截斷
            raf.close();
            while(itr.goToNextLog()) {
                if (!itr.logFile.delete()) {//把后面的log文件都刪除
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            close(itr);
        }
        return true;
    }

FileTxnIterator

在這里我們發(fā)現(xiàn)在根據(jù)zxid進(jìn)行read的時候會返回一個FileTxnIterator,所以有必要介紹這個FileTxnIterator

 public static class FileTxnIterator implements TxnLog.TxnIterator {
        File logDir;//日志文件存放目錄
        //開始讀取的起始zxid
        long zxid;//迭代器的開始zxid又跛,也就是這個迭代器主要是用來存放比我們要查找的zxid大的事務(wù)
        TxnHeader hdr;//事務(wù)頭
        Record record;
        File logFile;//當(dāng)前流指向的文件
        InputArchive ia;
        static final String CRC_ERROR="CRC check failed";

        PositionInputStream inputStream=null;
        //存放包含比我們需要查找的zxid大的事務(wù)id的日志文件列表
        private ArrayList<File> storedFiles;
}

構(gòu)造函數(shù)

public FileTxnIterator(File logDir, long zxid, boolean fastForward)
                throws IOException {
            this.logDir = logDir;
            this.zxid = zxid;
            //過濾出所有需要讀的日志文件碍拆,并利用goToNextLog()方法打開第一個日志日志文件的輸入流
            init();
            if (fastForward && hdr != null) {
                while (hdr.getZxid() < zxid) {
                    if (!next())
                        break;
                }
            }
        }

init方法中過濾出所有需要讀的日志文件

void init() throws IOException {
            //storedFiles按照事務(wù)id從大到小排序
            storedFiles = new ArrayList<File>();
            //排序日志文件
            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
            for (File f: files) {//迭代日志文件并找出可能存在事務(wù)id大于zxid的日志文件
                if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
                    storedFiles.add(f);
                }
                // 當(dāng)執(zhí)行到這步,說明后面的日志都比給定的zxid小慨蓝,就沒必要繼續(xù)遍歷感混,直接break
                else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
                    storedFiles.add(f);
                    break;
                }
            }
            //獲打開第一個日志日志文件的輸入流,也就是zxid最小的
            goToNextLog();
            //next方法用來從日志文件中讀取一條記錄礼烈,校驗并反序列化出來弧满,
            //讀取成功返回true,如果讀到了文件末尾則調(diào)goToNextLog()讀下一個文件济丘,以此遞歸直到最后
            next();
        }

goToNextLog

//打開第一個日志文件輸入流
        private boolean goToNextLog() throws IOException {
            if (storedFiles.size() > 0) {
                this.logFile = storedFiles.remove(storedFiles.size()-1);
                ia = createInputArchive(this.logFile);
                return true;
            }
            return false;
        }


next
next方法用來從日志文件中讀取一條記錄谱秽,校驗并反序列化出來,讀取成功返回true摹迷,如果讀到了文件末尾調(diào)goToNextLog()讀下一個文件疟赊,以此遞歸直到最后

//讀取下一個事務(wù),并檢查事務(wù)的萬完整性峡碉,包括事務(wù)頭信息
        public boolean next() throws IOException {
            if (ia == null) {
                return false;
            }
            try {
                long crcValue = ia.readLong("crcvalue");
                byte[] bytes = Util.readTxnBytes(ia);
                // Since we preallocate, we define EOF to be an
                if (bytes == null || bytes.length==0) {
                    throw new EOFException("Failed to read " + logFile);
                }
                // 檢查文件是否被破壞
                Checksum crc = makeChecksumAlgorithm();
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue())
                    throw new IOException(CRC_ERROR);
                hdr = new TxnHeader();
                //反序列事務(wù)信息
                record = SerializeUtils.deserializeTxn(bytes, hdr);
            } catch (EOFException e) {
                LOG.debug("EOF exception " + e);
                inputStream.close();
                inputStream = null;
                ia = null;
                hdr = null;
                // 執(zhí)行到這邊意味著文件已經(jīng)讀到末尾了近哟,就要把留指向下一個文件
                if (!goToNextLog()) {
                    return false;
                }
                // if we went to the next log file, we should call next() again
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            return true;
        }

事務(wù)日志小結(jié):

事務(wù)日志頻繁 flush 到磁盤,消耗大量磁盤 IO
磁盤空間預(yù)分配:事務(wù)日志剩余空間 < 4KB 時鲫寄,將文件大小增加 64 MB
磁盤預(yù)分配的目標(biāo):減少磁盤 seek 次數(shù)
建議:事務(wù)日志吉执,采用獨(dú)立磁盤單獨(dú)存放
image.png

事務(wù)序列化:本質(zhì)是生成一個字節(jié)數(shù)組
包含:事務(wù)頭疯淫、事務(wù)體的序列化
事務(wù)體:會話創(chuàng)建事務(wù)、節(jié)點(diǎn)創(chuàng)建事務(wù)戳玫、節(jié)點(diǎn)刪除事務(wù)熙掺、節(jié)點(diǎn)數(shù)據(jù)更新事務(wù)

數(shù)據(jù)相關(guān)過程

ZK 服務(wù)器啟動時,首先會進(jìn)行數(shù)據(jù)初始化咕宿,將磁盤中數(shù)據(jù)币绩,加載到內(nèi)存中,恢復(fù)現(xiàn)場府阀。

image.png

數(shù)據(jù)同步

ZK 集群服務(wù)器啟動之后缆镣,會進(jìn)行 2 個動作:

  • 選舉 Leader:分配角色
  • Learner 向 Leader 服務(wù)器注冊:數(shù)據(jù)同步
    數(shù)據(jù)同步,本質(zhì):將沒有在 Learner 上執(zhí)行的事務(wù)试浙,同步給 Learner董瞻。
image.png

集群啟動后,什么時候能夠?qū)ν馓峁┓?wù)田巴?需要等所有 Learner 都完成數(shù)據(jù)同步嗎钠糊?
過半策略:只需要半數(shù) Learner 完成數(shù)據(jù)同步,Learder 向所有已經(jīng)完成數(shù)據(jù)同步的 Learner 發(fā)送 UPTODATE 命令壹哺,表示集群具備了對外服務(wù)能力

FREQ

問題1眠蚂、我們知道zookeeper每次生成的事務(wù)日志都帶有當(dāng)前文件的第一條事務(wù)的zxid,這有什么好處呢斗躏?
(1)它可以幫助我們快速的定位某一個事務(wù)操作所在的日志文件。
(2)我們知道昔脯,事務(wù)的zxid中高32位包含了epoch啄糙,這個是leader所屬的周期,因此這樣我們可以通過日志文件名就清楚的知道云稚,當(dāng)前運(yùn)行時的zookeeper所屬的leader周期隧饼。

問題2、在前面静陈,我們知道燕雁,每次append寫入事務(wù)的時,我們都會檢測事務(wù)文件日志當(dāng)前剩余的空間是否大于4kb鲸拥,如果小于4kb拐格,則會在現(xiàn)有的文件基礎(chǔ)上加上64MB,然后使用0來填充刑赶?那么為什么要使用這種預(yù)分配的形式呢捏浊?
我們都知道,對于客戶端每次的事務(wù)提交撞叨,都要將事務(wù)寫入到事務(wù)日志中金踪,所以事務(wù)日志寫入的性能決定了zookeeper對客戶端的請求的響應(yīng)浊洞。也就是說,事務(wù)每次的請求可以看作是一次對底層磁盤的IO操作胡岔。嚴(yán)格的講法希,文件的不斷追加寫入操作會觸發(fā)底層磁盤IO為文件不斷的開辟新的磁盤塊,即磁盤seek靶瘸。因此為了減少seek的頻率苫亦,從而提高zookeeper的IO響應(yīng)的時間,創(chuàng)建事務(wù)日志的時候都會進(jìn)行文件的預(yù)分配--在文件處建之時奕锌,就會向操作系統(tǒng)預(yù)分配一塊很大的磁盤塊著觉,默認(rèn)是64mb,而一旦分配的磁盤塊剩余的空間<4kb惊暴,則會再次分配饼丘,這樣就可以避免隨著每次事務(wù)的寫入過程中導(dǎo)致日志文件的不斷增長而需要不斷的觸發(fā)seek。事務(wù)預(yù)分配的大小辽话,可以通過系統(tǒng)參數(shù)zookeeper.preAllocsize來設(shè)置肄鸽。

問題3、事務(wù)日志文件是如何檢查一個事務(wù)日志文件的完整性呢油啤?
事務(wù)日志文件為了保證和檢查其文件的完整性和數(shù)據(jù)的準(zhǔn)確性典徘。zookeeper在每次事務(wù)操作寫入前,都會根據(jù)系列化的字節(jié)數(shù)組來計算checksum益咬,這樣當(dāng)我們重新載入事務(wù)的時候逮诲,就可以檢查這個事務(wù)文件的完整性了。zookeeper采用Adler32算法來計算checksum幽告。

問題4梅鹦、事務(wù)是什么時候刷盤的?
我們剛才講過冗锁,事務(wù)每次刷盤都是一次IO操作齐唆,所以為了減少刷盤的次數(shù),從而提高響應(yīng)性能冻河,zookeeper會將每次事務(wù)的請求寫入都是先寫到一個緩沖流中箍邮,而并非真正的刷盤到磁盤上去,那么在什么時候輸盤到磁盤中呢叨叙?zookeeper服務(wù)器在啟動的時候會單獨(dú)啟動一個requestProcessor線程來處理這個請求隊列queuedRequests锭弊,如果隊列里面有待處理的事務(wù)請求,則該線程將會取出隊列事務(wù)并寫入到事務(wù)日志文件中摔敛,這個時候的寫入是先寫入到一個緩沖流中廷蓉,當(dāng)requestProcessor統(tǒng)計寫入緩沖流的事務(wù)超過1000或者隊列已經(jīng)沒有事務(wù)了,則會開始將緩沖流中的數(shù)據(jù)刷到磁盤塊中。至于刷盤的方式是可選擇的桃犬,通過配置控制它是異步還是同步刷到磁盤中刹悴。

問題5、事務(wù)日志的截斷方法什么請下會觸發(fā)攒暇?
由于在zookeeper運(yùn)行中土匀,可能由于一些異常情況會導(dǎo)致learner的lastzxid比leader的還大,無論這種情況是怎么發(fā)生的形用,這都是一種不正常的現(xiàn)象就轧。為了遵循一個集群中,只要存在leader田度,那么所有機(jī)器都必須與該leader的數(shù)據(jù)進(jìn)行同步妒御,所以leader會向learner觸發(fā)truc方法,要求這個leaner對日志進(jìn)行截斷镇饺。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乎莉,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子奸笤,更是在濱河造成了極大的恐慌惋啃,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件监右,死亡現(xiàn)場離奇詭異边灭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)健盒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門绒瘦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扣癣,你說我怎么就攤上這事椭坚。” “怎么了搏色?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長券册。 經(jīng)常有香客問我频轿,道長,這世上最難降的妖魔是什么烁焙? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任航邢,我火速辦了婚禮,結(jié)果婚禮上骄蝇,老公的妹妹穿的比我還像新娘膳殷。我一直安慰自己,他們只是感情好九火,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布赚窃。 她就那樣靜靜地躺著册招,像睡著了一般。 火紅的嫁衣襯著肌膚如雪勒极。 梳的紋絲不亂的頭發(fā)上是掰,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機(jī)與錄音辱匿,去河邊找鬼键痛。 笑死,一個胖子當(dāng)著我的面吹牛匾七,可吹牛的內(nèi)容都是我干的絮短。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼昨忆,長吁一口氣:“原來是場噩夢啊……” “哼巢音!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起呢岗,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤本昏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后痢缎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體胁勺,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年独旷,在試婚紗的時候發(fā)現(xiàn)自己被綠了署穗。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡嵌洼,死狀恐怖案疲,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情麻养,我是刑警寧澤褐啡,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站鳖昌,受9級特大地震影響备畦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜许昨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一懂盐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧糕档,春花似錦莉恼、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽尿背。三九已至,卻和暖如春悉患,著一層夾襖步出監(jiān)牢的瞬間残家,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工售躁, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留坞淮,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓陪捷,卻偏偏與公主長得像回窘,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子市袖,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容