2锨侯、數(shù)據(jù)和存儲-事務(wù)日志FileTxnLog

  在講內(nèi)存數(shù)據(jù)庫ZKDatabase之前,我們來先了解下zookeeper內(nèi)存數(shù)據(jù)庫的兩種持久化方式:
      FileTxnLog:事務(wù)日志文件冬殃,以日志追加的方式維護(hù)囚痴。
      FileSnap:內(nèi)存快照文件,保存內(nèi)存數(shù)據(jù)庫某一時刻的狀態(tài)审葬,所以數(shù)據(jù)不一定是最新的深滚。

然后我們再看一下zookeeper持久化總體的框架

clipboard.png

  TxnLog:是一個讀取日志的接口,提供了讀取事務(wù)log的接口方法涣觉。
  SnapShot:是一個操作日志快照的接口痴荐,提供了對快照文件操作的方法。
  FileTxnLog:實現(xiàn)TxnLog接口官册,提供了讀取事務(wù)日志的方法實現(xiàn)生兆。
  FileSnap:實現(xiàn)Snapshot接口,負(fù)責(zé)存儲攀隔、序列化皂贩、反序列化栖榨、訪問快照昆汹。
  FileTxnSnapLog,封裝了TxnLog和SnapShot婴栽。
  Util满粗,工具類,提供持久化所需的API愚争。

我們可以看到映皆,zookeeper總共提供了兩種持久化文件挤聘,分別是內(nèi)存快照SnapShot和事務(wù)日志TxnLog(這種日志有點類似MySQL數(shù)據(jù)庫中的binlog,zookeeper會把所有涉及到修改內(nèi)存數(shù)據(jù)結(jié)構(gòu)的操作日志記錄到該log中捅彻,也就是說组去,zookeeper會把每一個事務(wù)操作諸如添加、刪除都會記錄到這個日志文件中步淹,當(dāng)zookeeper出現(xiàn)異常時从隆,可以借助該事務(wù)日志進(jìn)行數(shù)據(jù)恢復(fù))。今天我們主要看下事務(wù)日志

事務(wù)日志FileTxnLog

我們知道缭裆,日志文件它主要是負(fù)責(zé)實時記錄對服務(wù)端的每一次的事務(wù)操作日志(這里講的事務(wù)和數(shù)據(jù)庫中事務(wù)不一樣键闺,它是指涉及到對服務(wù)器端的內(nèi)存數(shù)據(jù)庫的增刪改這種會變更內(nèi)存數(shù)據(jù)庫的操作行為,不記錄查詢)澈驼。通過這種日志文件辛燥,當(dāng)zookeeper因為故障而發(fā)生重啟時,我們就可以根據(jù)內(nèi)存快照文件和事務(wù)日志使得內(nèi)存數(shù)據(jù)庫恢復(fù)最新的數(shù)據(jù)庫狀態(tài)缝其。在zookeeper中挎塌,F(xiàn)ileTxnLog就是負(fù)責(zé)日志文件持久化的邏輯對象,它是TxnLog的一個實現(xiàn)類内边。它會通過在分配內(nèi)存時會預(yù)分配固定大的內(nèi)存大胁;同時保證每次寫的時候都是直接追加順序?qū)懭爰俨校瑥亩WC日志文件的性能缭贡。所以下面我們就開始來了解下FileTxnLog。

  首先我們來看下TxnLog它給我們哪些方法:
  void rollLog() throws IOException;// 滾動日志辉懒,從當(dāng)前日志滾到下一個日志阳惹,不是回滾
  boolean append(TxnHeader hdr, Record r) throws IOException;//追加一個請求至事務(wù)性日志
  TxnIterator read(long zxid) throws IOException;// 可迭代讀取事務(wù)性日志
  long getLastLoggedZxid() throws IOException;//事務(wù)性操作的最新zxid
  boolean truncate(long zxid) throws IOException;// 清空zxid以后的日志
  long getDbId() throws IOException;// 獲取數(shù)據(jù)庫的id
  void commit() throws IOException;// 提交事務(wù)并進(jìn)行確認(rèn)
  void close() throws IOException;// 關(guān)閉事務(wù)性日志

FileTxnLog是TxnLog的一個實現(xiàn)類,所以它也就負(fù)責(zé)了實現(xiàn)該接口上的方法眶俩,我們來看下它是怎么實現(xiàn)的莹汤,對于一個日志文件,特別要關(guān)注的是它的讀寫操作的性能颠印。

FileTxnLog的具體屬性

public class FileTxnLog implements TxnLog {
    private static final Logger LOG;
    //預(yù)分配64m大小
    static long preAllocSize =  65536 * 1024;
    //直接內(nèi)存
    private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
    //魔數(shù)纲岭,用于校驗日志文件的正確性,默認(rèn)為1514884167
    public final static int TXNLOG_MAGIC =ByteBuffer.wrap("ZKLG".getBytes()).getInt();
    public final static int VERSION = 2;
    //日志文件名好的前綴
    public static final String LOG_FILE_PREFIX = "log";
    /** Maximum time we allow for elapsed fsync before WARNing */
    private final static long fsyncWarningThresholdMS;
    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);
        //獲得系統(tǒng)參數(shù)线罕,判斷系統(tǒng)參數(shù)配置了預(yù)分配內(nèi)存大小
        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
        /** Local variable to read fsync.warningthresholdms into */
        Long fsyncWarningThreshold;
        if ((fsyncWarningThreshold = Long.getLong("zookeeper.fsync.warningthresholdms")) == null)
            fsyncWarningThreshold = Long.getLong("fsync.warningthresholdms", 1000);
        fsyncWarningThresholdMS = fsyncWarningThreshold;
    }
    // 最大(也就是最新)的zxid
    long lastZxidSeen;
    volatile BufferedOutputStream logStream = null;
    volatile OutputArchive oa;
    volatile FileOutputStream fos = null;
    //log目錄文件
    File logDir;
    //是否強(qiáng)制同步止潮,默認(rèn)是yes
    private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");
    long dbId;
    private LinkedList<FileOutputStream> streamsToFlush =
        new LinkedList<FileOutputStream>();
    // 當(dāng)前配置的大小
    long currentSize;
    // 寫日志文件
    File logFileWrite = null;

    private volatile long syncElapsedMS = -1L;
}

FileTxnLog方法

(1)append: 主要是負(fù)責(zé)日志追加,在對日志文件的寫操作時钞楼,zookeeper主要是通過日志追加的方法
public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        //校驗頭部不能為空,hdr主要包含了czxid喇闸、clientId、zxid等相關(guān)信息
        if (hdr == null) {
            return false;
        }
        //如果待寫入的事務(wù)的事務(wù)id小于本地保存的最新的事務(wù)id,給提醒
        if (hdr.getZxid() <= lastZxidSeen) {
            LOG.warn("Current zxid " + hdr.getZxid()
                    + " is <= " + lastZxidSeen + " for "
                    + hdr.getType());
        } else {
            lastZxidSeen = hdr.getZxid();
        }
        //在第一次新建一個FileTxnLog時候燃乍,logStream還是空的唆樊,這個時候需要為它創(chuàng)建一個新的日志文件,并把logStream指向這個日志文件
        if (logStream==null) {
           if(LOG.isInfoEnabled()){
                LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
           }
           //根據(jù)待寫入的事務(wù)id創(chuàng)建一個新的日志文件刻蟹,我們可以看到文件名包含這個文件存放的事務(wù)的最小事務(wù)id
           logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
           fos = new FileOutputStream(logFileWrite);
           logStream=new BufferedOutputStream(fos);
           oa = BinaryOutputArchive.getArchive(logStream);
           //根據(jù)魔數(shù)逗旁、版本號和數(shù)據(jù)庫id生成日志文件頭,dbId默認(rèn)是0
           FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
           fhdr.serialize(oa, "fileheader");
           // 確保在用0填充之前,先把魔數(shù)信息等寫入到文件中舆瘪,進(jìn)行依次flush
           logStream.flush();
           currentSize = fos.getChannel().position();//獲取當(dāng)前文件流中的大小
           streamsToFlush.add(fos);
        }
        //重新計算文件大小痢艺,保證文件的大小是預(yù)分配大小的整數(shù)倍
        //可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷介陶;
        //迅速占用磁盤空間堤舒,防止使用過程中所需空間不足
        currentSize = padFile(fos.getChannel());
        //序列化TxnHeader Record記錄到byte[]
        byte[] buf = Util.marshallTxnEntry(hdr, txn);
        if (buf == null || buf.length == 0) {
            throw new IOException("Faulty serialization for header " +
                    "and txn");
        }
        //創(chuàng)建校驗和的算法,默認(rèn)是Adler32
        //Adler-32可用于計算數(shù)據(jù)流的校驗和 校驗和哺呜,幾乎與 CRC-32 一樣可靠舌缤,但是能夠更快地計算出來
        Checksum crc = makeChecksumAlgorithm();
        //使用指定的字節(jié)數(shù)組更新校驗和
        crc.update(buf, 0, buf.length);
      //將更新的校驗和值寫入到日志文件中
        oa.writeLong(crc.getValue(), "txnEntryCRC");
        //將TxnHeader Record數(shù)據(jù)寫入到輸出流
        Util.writeTxnBytes(oa, buf);
        return true;
    }
   //1.先計算buf數(shù)據(jù)長度寫入
        //2.寫入buf數(shù)組數(shù)據(jù)
        // 3.記錄尾部以’B’字符結(jié)尾,寫入0x42
     public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
            throws IOException {
        oa.writeBuffer(bytes, "txnEntry");
        oa.writeByte((byte) 0x42, "EOR"); // 'B'
    }

我們再看看下它是怎么計算和填充為文件分配的大小

 private long padFile(FileChannel fileChannel) throws IOException {
        //計算新的文件大小某残,并通過填充0先占用未使用的byte空間国撵,
        //這樣可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷
        //  currentSize默認(rèn)是0
        long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
        if (currentSize != newFileSize) {//將整個日志文件中未使用的部分填充0
            fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
            currentSize = newFileSize;
        }
        return currentSize;
    }

/***
     *
     * @param position 通過管道寫入的字節(jié)長度
     * @param fileSize 當(dāng)前設(shè)置的文件大小
     * @param preAllocSize 與分配大小
     * @return
     */
    public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
        // I如果剩余空間不足4k且預(yù)分配空間大于0
        if (preAllocSize > 0 && position + 4096 >= fileSize) {
            //如果已寫入的長度超過了文件大小玻墅,文件大小擴(kuò)為 寫入的字節(jié)長度+預(yù)分配的大小
            if (position > fileSize){//剛創(chuàng)建的時候肯定走這個介牙,這樣就可以保證fileSize始終是preAllocSize的整數(shù)倍
                fileSize = position + preAllocSize;
                //這邊會重新調(diào)整到預(yù)分配塊長度的整數(shù)倍(是否是為了方便管理統(tǒng)計等?)
                fileSize -= fileSize % preAllocSize;
            } else {
                fileSize += preAllocSize;
            }
        }

        return fileSize;
    }

現(xiàn)在我們對FileTxnLog文件的寫應(yīng)該有一定的了解澳厢。也知道环础,在文件新建的時候會預(yù)分配文件內(nèi)存大小,并用0來填充剩拢,從而保證文件的磁盤占用是連續(xù)的线得,同時通過日志追加的方式,我們可以保證對日志文件的寫的順序性徐伐,從而保證了寫性能贯钩;我們也可以到,每次將事務(wù)寫入到日志文件中時办素,都會先根據(jù)寫入的事務(wù)計算并寫入一個校驗和角雷,然后再把事務(wù)流寫入到日志文件中,這樣可以充分保證事務(wù)日志的安全性和完整性性穿。

(2)read:看完寫文件操作勺三,我們當(dāng)然想知道讀文件的操作。因為讀寫是一一對應(yīng)的季二。文件的讀取檩咱,zookeepeer給我們提供了兩種重載的方法:
  /***
   * zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
    * 默認(rèn)fastForward=true
   */
  TxnIterator read(long zxid)
   /***
   * zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
   * fastForward:
   *        true:則返回的迭代器只包含大于等于zxid的事務(wù)
   *        fasle:則返回的迭代器除了包含大于等于zxid的事務(wù)揭措,還包含了跟ZXID同一個日志文件的ZXID
   */
  TxnIterator read(long zxid, boolean fastForward)

下面我看下read具體是怎么實現(xiàn)的

    public TxnIterator read(long zxid, boolean fastForward) throws IOException {
        return new FileTxnIterator(logDir, zxid, fastForward);
    }
 /***
     * 讀取事務(wù)日志胯舷, 這個方法在服務(wù)當(dāng)機(jī)恢復(fù)的時候刻蚯,用來遍歷事務(wù)日志來恢復(fù)數(shù)據(jù)
     * 根據(jù)目標(biāo)事務(wù)zxid,從日志文件中讀取大于該事務(wù)id的事務(wù)桑嘶,并返回這些事務(wù)構(gòu)成的迭代器TxnIterator
     * 注意底層在遍歷每一個日志文件的時候炊汹,會對文件進(jìn)行魔數(shù)校驗等,避免文件被損壞
     * @param zxid 迭代器的開始位置
     * @return
     * @throws IOException
     */
   public FileTxnIterator(File logDir, long zxid, boolean fastForward)
                throws IOException {
            this.logDir = logDir;//日志文件存放目錄
            this.zxid = zxid;//目標(biāo)事務(wù)ID 
            init();
            //在init()方法里逃顶,我們拿到目標(biāo)文件的第一個事務(wù)ID讨便,這個時候如果fastForward 是true的話,就要繼續(xù)往下遍歷以政,找出目標(biāo)zxid的事務(wù)霸褒,才進(jìn)行停止。
            //這里要注意hdr是上一次遍歷的事務(wù)頭
            if (fastForward && hdr != null) {
                while (hdr.getZxid() < zxid) {
                    if (!next())
                        break;
                }
            }
        }
  
    void init() throws IOException {
            storedFiles = new ArrayList<File>();
          //排序目錄下的日志文件盈蛮,我們知道文件名稱是根據(jù)事務(wù)id來創(chuàng)建的废菱,所以,文件的排序也等價于事務(wù)的排序
            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
            for (File f: files) {
                //找出起始事務(wù)ID大于ZXID的日志文件
                if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
                    storedFiles.add(f);
                }
                //當(dāng)?shù)谝淮伪闅v到起始ID小于ZXID的日志文件后抖誉,要記得把該文件也作為查找目標(biāo)文件殊轴,因為它里面可能包含大于ZXID的事務(wù)。
                //同時停止遍歷袒炉,因為后面繼續(xù)遍歷下去也沒意思旁理,都是小于ZXID的日志文件。
                else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
                    storedFiles.add(f);
                    break;
                }
            }
          //找出已排好序且可能存在大于ZXID的日志文件后我磁,打開第一個日志文件輸入流準(zhǔn)備讀取
            goToNextLog();
            //注意這個時候調(diào)用next()只是獲取第一個日志文件中的第一個事務(wù)ID孽文,該事務(wù)ID并不一定是ZXID。
            next();
        }
    //開始讀取下一個事務(wù)
     public boolean next() throws IOException {
            if (ia == null) {
                return false;
            }
            try {
                //讀取校驗和的值
                long crcValue = ia.readLong("crcvalue");
                //讀取事務(wù)
                byte[] bytes = Util.readTxnBytes(ia);
                //因為我們是采用預(yù)分配內(nèi)存方式夺艰,會定義一個EOF作為空的事務(wù)叛溢。所以,當(dāng)我們讀取到一個空的劲适,也就表明日志文件已讀到末尾
                if (bytes == null || bytes.length==0) {
                    throw new EOFException("Failed to read " + logFile);
                }
                //分析校驗和的值是否正確楷掉,防止消息被破壞。這就是為什么我們在append的時候要加入校驗和
                Checksum crc = makeChecksumAlgorithm();
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue())
                    throw new IOException(CRC_ERROR);
                //反序列事務(wù)
                hdr = new TxnHeader();
                record = SerializeUtils.deserializeTxn(bytes, hdr);
            } catch (EOFException e) {
                LOG.debug("EOF exception " + e);
                inputStream.close();
                inputStream = null;
                ia = null;
                hdr = null;
                //日志文件已讀到末尾了霞势,所以要跳到下一個文件開始讀取
                if (!goToNextLog()) {
                    return false;
                }
                // if we went to the next log file, we should call next() again
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            return true;
        }

FileTxnLog除了提供對事務(wù)日志的讀寫之外烹植,還提供了其它的一些額外方法,下面我們繼續(xù)看些這些方法

getLogFiles:獲取可能包含比目標(biāo)事務(wù)ID大的日志文件的數(shù)組(服務(wù)器啟動并恢復(fù)內(nèi)存數(shù)據(jù)庫的時候會調(diào)用這個方法進(jìn)行內(nèi)存數(shù)據(jù)庫恢復(fù))
/***
     *
     * @param logDirList 日志文件列表
     * @param snapshotZxid  通過內(nèi)存快照恢復(fù)的最大的事務(wù)id愕贡,剩余的比snapshotZxid就要從日志文件里恢復(fù)
     * @return 找出比包含有<=snapshotZxid的事務(wù)id的日志文件列表,當(dāng)snapshotZxid=0時草雕,獲取所有的文件
     */
    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        //對日志文件進(jìn)行排序,按照事務(wù)ID從高到低
        List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
        long logZxid = 0;
        // Find the log file that starts before or at the same time as the
        // zxid of the snapshot
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            //如果文件名的事務(wù)id:fzxid>快照的最新的zxid
            if (fzxid > snapshotZxid) {
                continue;
            }
            //如果fzxid <= snapshotZxid && fzxid > logZxid
            if (fzxid > logZxid) {
                logZxid = fzxid;
            }
        }
        List<File> v=new ArrayList<File>(5);
        for (File f : files) {
            long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
            if (fzxid < logZxid) {//找出文件id大于logZxid的文件名
                continue;
            }
            v.add(f);
        }
        return v.toArray(new File[0]);

    }
getLastLoggedZxid獲取最新的事務(wù)ID
    //該方法只在節(jié)點服務(wù)器啟動的時候被調(diào)用
    public long getLastLoggedZxid() {//從日志文件中獲取最大的Zxid
        //找出所有的日志文件并排序(其實可以排序后拿第一個就好了肮桃浴墩虹?)
        File[] files = getLogFiles(logDir.listFiles(), 0);
        //排序日志文件嘱巾,并從日志文件名稱上獲取包含最大zxid的日志文件的文件名中的日志id
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;

        // 在最新的日志文件里迭代查找最新的事務(wù)ID
        long zxid = maxLog;
        TxnIterator itr = null;
        try {
            FileTxnLog txn = new FileTxnLog(logDir);
            //根據(jù)文件名的事務(wù)id遍歷迭代該日志文件,獲取整個內(nèi)存數(shù)據(jù)庫的最大事務(wù)id,
            itr = txn.read(maxLog);
            while (true) {
                if(!itr.next())
                    break;
                TxnHeader hdr = itr.getHeader();
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            close(itr);
        }
        return zxid;
    }
commit

commit方法诫钓,提交日志并刷至磁盤旬昭,force方法會把所有未寫磁盤的數(shù)據(jù)都強(qiáng)制寫入磁盤。 這是因為在操作系統(tǒng)中出于性能考慮回把數(shù)據(jù)放入緩沖區(qū)菌湃,所以不能保證數(shù)據(jù)在調(diào)用write寫入文件通道后就及時寫到磁盤上了问拘,除非手動調(diào)用force方法。 force方法需要一個布爾參數(shù)惧所,代表是否把meta data也一并強(qiáng)制寫入骤坐。

 /***
     * 提交事務(wù),確保日志刷新到磁盤中
     * @throws IOException
     */
    public synchronized void commit() throws IOException {
        if (logStream != null) {//刷新
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) {
            log.flush();
            if (forceSync) {//是否強(qiáng)制刷盤
                long startSyncNS = System.nanoTime();

                FileChannel channel = log.getChannel();
                channel.force(false);

                syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) {
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "File size is " + channel.size() + " bytes. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) {
            streamsToFlush.removeFirst().close();
        }
    }
truncate:截斷刪除比zxid大的事務(wù)

truncate清空大于給定的zxid事務(wù)日志下愈,集群版learner向leader同步的時候纽绍,leader告訴learner需要回滾同步調(diào)用Learner#syncWithLeader

public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            //找出大于zxid的事務(wù)迭代器
            itr = new FileTxnIterator(this.logDir, zxid);
            PositionInputStream input = itr.inputStream;
            if(input == null) {
                throw new IOException("No log files found to truncate! This could " +
                        "happen if you still have snapshots from an old setup or " +
                        "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
            }
            long pos = input.getPosition();
            RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
            raf.setLength(pos);//把當(dāng)前l(fā)og后面的部分(比zxid更大的)截斷
            raf.close();
            while(itr.goToNextLog()) {
                if (!itr.logFile.delete()) {//把后面的log文件都刪除
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            close(itr);
        }
        return true;
    }

FileTxnIterator

在這里我們發(fā)現(xiàn)在根據(jù)zxid進(jìn)行read的時候會返回一個FileTxnIterator,所以我么有必要介紹這個FileTxnIterator

我們看下這個類有哪些屬性

    public static class FileTxnIterator implements TxnLog.TxnIterator {
        File logDir;//日志文件存放目錄
        //開始讀取的起始zxid
        long zxid;//迭代器的開始zxid势似,也就是這個迭代器主要是用來存放比我們要查找的zxid大的事務(wù)
        TxnHeader hdr;//事務(wù)頭
        Record record;
        File logFile;//當(dāng)前流指向的文件
        InputArchive ia;
        static final String CRC_ERROR="CRC check failed";

        PositionInputStream inputStream=null;
        //存放包含比我們需要查找的zxid大的事務(wù)id的日志文件列表
        private ArrayList<File> storedFiles;
}
構(gòu)造函數(shù)
 public FileTxnIterator(File logDir, long zxid, boolean fastForward)
                throws IOException {
            this.logDir = logDir;
            this.zxid = zxid;
            //過濾出所有需要讀的日志文件拌夏,并利用goToNextLog()方法打開第一個日志日志文件的輸入流
            init();
            if (fastForward && hdr != null) {
                while (hdr.getZxid() < zxid) {
                    if (!next())
                        break;
                }
            }
        }
init

init方法中過濾出所有需要讀的日志文件

 void init() throws IOException {
            //storedFiles按照事務(wù)id從大到小排序
            storedFiles = new ArrayList<File>();
            //排序日志文件
            List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
            for (File f: files) {//迭代日志文件并找出可能存在事務(wù)id大于zxid的日志文件
                if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
                    storedFiles.add(f);
                }
                // 當(dāng)執(zhí)行到這步,說明后面的日志都比給定的zxid小叫编,就沒必要繼續(xù)遍歷辖佣,直接break
                else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
                    storedFiles.add(f);
                    break;
                }
            }
            //獲打開第一個日志日志文件的輸入流,也就是zxid最小的
            goToNextLog();
            //next方法用來從日志文件中讀取一條記錄搓逾,校驗并反序列化出來卷谈,
            //讀取成功返回true,如果讀到了文件末尾則調(diào)goToNextLog()讀下一個文件霞篡,以此遞歸直到最后
            next();
        }

goToNextLog
    //打開第一個日志文件輸入流
        private boolean goToNextLog() throws IOException {
            if (storedFiles.size() > 0) {
                this.logFile = storedFiles.remove(storedFiles.size()-1);
                ia = createInputArchive(this.logFile);
                return true;
            }
            return false;
        }
next

next方法用來從日志文件中讀取一條記錄世蔗,校驗并反序列化出來,讀取成功返回true朗兵,如果讀到了文件末尾調(diào)goToNextLog()讀下一個文件污淋,以此遞歸直到最后

 //讀取下一個事務(wù),并檢查事務(wù)的萬完整性余掖,包括事務(wù)頭信息
        public boolean next() throws IOException {
            if (ia == null) {
                return false;
            }
            try {
                long crcValue = ia.readLong("crcvalue");
                byte[] bytes = Util.readTxnBytes(ia);
                // Since we preallocate, we define EOF to be an
                if (bytes == null || bytes.length==0) {
                    throw new EOFException("Failed to read " + logFile);
                }
                // 檢查文件是否被破壞
                Checksum crc = makeChecksumAlgorithm();
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue())
                    throw new IOException(CRC_ERROR);
                hdr = new TxnHeader();
                //反序列事務(wù)信息
                record = SerializeUtils.deserializeTxn(bytes, hdr);
            } catch (EOFException e) {
                LOG.debug("EOF exception " + e);
                inputStream.close();
                inputStream = null;
                ia = null;
                hdr = null;
                // 執(zhí)行到這邊意味著文件已經(jīng)讀到末尾了寸爆,就要把留指向下一個文件
                if (!goToNextLog()) {
                    return false;
                }
                // if we went to the next log file, we should call next() again
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            return true;
        }

總結(jié)

問題

問題1、我們知道zookeeper每次生成的事務(wù)日志都帶有當(dāng)前文件的第一條事務(wù)的zxid盐欺,這有什么好處呢赁豆?
(1)它可以幫助我們快速的定位某一個事務(wù)操作所在的日志文件。
(2)我們知道冗美,事務(wù)的zxid中高32位包含了epoch魔种,這個是leader所屬的周期,因此這樣我們可以通過日志文件名就清楚的知道粉洼,當(dāng)前運行時的zookeeper所屬的leader周期节预。

問題2叶摄、在前面,我們知道安拟,每次append寫入事務(wù)的時蛤吓,我們都會檢測事務(wù)文件日志當(dāng)前剩余的空間是否大于4kb,如果小于4kb去扣,則會在現(xiàn)有的文件基礎(chǔ)上加上64MB柱衔,然后使用0來填充樊破?那么為什么要使用這種預(yù)分配的形式呢愉棱?
我們都知道,對于客戶端每次的事務(wù)提交哲戚,都要將事務(wù)寫入到事務(wù)日志中奔滑,所以事務(wù)日志寫入的性能決定了zookeeper對客戶端的請求的響應(yīng)。也就是說顺少,事務(wù)每次的請求可以看作是一次對底層磁盤的IO操作朋其。嚴(yán)格的講,文件的不斷追加寫入操作會觸發(fā)底層磁盤IO為文件不斷的開辟新的磁盤塊脆炎,即磁盤seek梅猿。因此為了減少seek的頻率,從而提高zookeeper的IO響應(yīng)的時間秒裕,創(chuàng)建事務(wù)日志的時候都會進(jìn)行文件的預(yù)分配--在文件處建之時袱蚓,就會向操作系統(tǒng)預(yù)分配一塊很大的磁盤塊,默認(rèn)是64mb几蜻,而一旦分配的磁盤塊剩余的空間<4kb喇潘,則會再次分配,這樣就可以避免隨著每次事務(wù)的寫入過程中導(dǎo)致日志文件的不斷增長而需要不斷的觸發(fā)seek梭稚。事務(wù)預(yù)分配的大小颖低,可以通過系統(tǒng)參數(shù)zookeeper.preAllocsize來設(shè)置。

問題3弧烤、事務(wù)日志文件是如何檢查一個事務(wù)日志文件的完整性呢忱屑?
事務(wù)日志文件為了保證和檢查其文件的完整性和數(shù)據(jù)的準(zhǔn)確性。zookeeper在每次事務(wù)操作寫入前暇昂,都會根據(jù)系列化的字節(jié)數(shù)組來計算checksum莺戒,這樣當(dāng)我們重新載入事務(wù)的時候,就可以檢查這個事務(wù)文件的完整性了话浇。zookeeper采用Adler32算法來計算checksum脏毯。

問題4沪蓬、事務(wù)是什么時候刷盤的水评?
我們剛才講過,事務(wù)每次刷盤都是一次IO操作,所以為了減少刷盤的次數(shù)另玖,從而提高響應(yīng)性能,zookeeper會將每次事務(wù)的請求寫入都是先寫到一個緩沖流中其掂,而并非真正的刷盤到磁盤上去缤底,那么在什么時候輸盤到磁盤中呢?zookeeper服務(wù)器在啟動的時候會單獨啟動一個requestProcessor線程來處理這個請求隊列queuedRequests自娩,如果隊列里面有待處理的事務(wù)請求用踩,則該線程將會取出隊列事務(wù)并寫入到事務(wù)日志文件中,這個時候的寫入是先寫入到一個緩沖流中忙迁,當(dāng)requestProcessor統(tǒng)計寫入緩沖流的事務(wù)超過1000或者隊列已經(jīng)沒有事務(wù)了脐彩,則會開始將緩沖流中的數(shù)據(jù)刷到磁盤塊中。至于刷盤的方式是可選擇的姊扔,通過配置控制它是異步還是同步刷到磁盤中惠奸。

問題5、事務(wù)日志的截斷方法什么請下會觸發(fā)恰梢?
由于在zookeeper運行中佛南,可能由于一些異常情況會導(dǎo)致learner的lastzxid比leader的還大,無論這種情況是怎么發(fā)生的嵌言,這都是一種不正常的現(xiàn)象嗅回。為了遵循一個集群中,只要存在leader摧茴,那么所有機(jī)器都必須與該leader的數(shù)據(jù)進(jìn)行同步绵载,所以leader會向learner觸發(fā)truc方法,要求這個leaner對日志進(jìn)行截斷蓬蝶。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末尘分,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子丸氛,更是在濱河造成了極大的恐慌培愁,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缓窜,死亡現(xiàn)場離奇詭異定续,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)禾锤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門私股,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人恩掷,你說我怎么就攤上這事倡鲸。” “怎么了黄娘?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵峭状,是天一觀的道長克滴。 經(jīng)常有香客問我,道長优床,這世上最難降的妖魔是什么劝赔? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮胆敞,結(jié)果婚禮上着帽,老公的妹妹穿的比我還像新娘。我一直安慰自己移层,他們只是感情好仍翰,可當(dāng)我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著幽钢,像睡著了一般歉备。 火紅的嫁衣襯著肌膚如雪傅是。 梳的紋絲不亂的頭發(fā)上匪燕,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機(jī)與錄音喧笔,去河邊找鬼帽驯。 笑死,一個胖子當(dāng)著我的面吹牛书闸,可吹牛的內(nèi)容都是我干的尼变。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼浆劲,長吁一口氣:“原來是場噩夢啊……” “哼嫌术!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起牌借,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤度气,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后膨报,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體磷籍,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年现柠,在試婚紗的時候發(fā)現(xiàn)自己被綠了院领。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡够吩,死狀恐怖比然,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情周循,我是刑警寧澤强法,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布扒寄,位于F島的核電站,受9級特大地震影響拟烫,放射性物質(zhì)發(fā)生泄漏该编。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一硕淑、第九天 我趴在偏房一處隱蔽的房頂上張望课竣。 院中可真熱鬧,春花似錦置媳、人聲如沸于樟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽迂曲。三九已至,卻和暖如春寥袭,著一層夾襖步出監(jiān)牢的瞬間路捧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工传黄, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留杰扫,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓膘掰,卻偏偏與公主長得像章姓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子识埋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容