在講內(nèi)存數(shù)據(jù)庫ZKDatabase之前,我們來先了解下zookeeper內(nèi)存數(shù)據(jù)庫的兩種持久化方式:
FileTxnLog:事務(wù)日志文件冬殃,以日志追加的方式維護(hù)囚痴。
FileSnap:內(nèi)存快照文件,保存內(nèi)存數(shù)據(jù)庫某一時刻的狀態(tài)审葬,所以數(shù)據(jù)不一定是最新的深滚。
然后我們再看一下zookeeper持久化總體的框架
TxnLog:是一個讀取日志的接口,提供了讀取事務(wù)log的接口方法涣觉。
SnapShot:是一個操作日志快照的接口痴荐,提供了對快照文件操作的方法。
FileTxnLog:實現(xiàn)TxnLog接口官册,提供了讀取事務(wù)日志的方法實現(xiàn)生兆。
FileSnap:實現(xiàn)Snapshot接口,負(fù)責(zé)存儲攀隔、序列化皂贩、反序列化栖榨、訪問快照昆汹。
FileTxnSnapLog,封裝了TxnLog和SnapShot婴栽。
Util满粗,工具類,提供持久化所需的API愚争。
我們可以看到映皆,zookeeper總共提供了兩種持久化文件挤聘,分別是內(nèi)存快照SnapShot和事務(wù)日志TxnLog(這種日志有點類似MySQL數(shù)據(jù)庫中的binlog,zookeeper會把所有涉及到修改內(nèi)存數(shù)據(jù)結(jié)構(gòu)的操作日志記錄到該log中捅彻,也就是說组去,zookeeper會把每一個事務(wù)操作諸如添加、刪除都會記錄到這個日志文件中步淹,當(dāng)zookeeper出現(xiàn)異常時从隆,可以借助該事務(wù)日志進(jìn)行數(shù)據(jù)恢復(fù))。今天我們主要看下事務(wù)日志
事務(wù)日志FileTxnLog
我們知道缭裆,日志文件它主要是負(fù)責(zé)實時記錄對服務(wù)端的每一次的事務(wù)操作日志(這里講的事務(wù)和數(shù)據(jù)庫中事務(wù)不一樣键闺,它是指涉及到對服務(wù)器端的內(nèi)存數(shù)據(jù)庫的增刪改這種會變更內(nèi)存數(shù)據(jù)庫的操作行為,不記錄查詢)澈驼。通過這種日志文件辛燥,當(dāng)zookeeper因為故障而發(fā)生重啟時,我們就可以根據(jù)內(nèi)存快照文件和事務(wù)日志使得內(nèi)存數(shù)據(jù)庫恢復(fù)最新的數(shù)據(jù)庫狀態(tài)缝其。在zookeeper中挎塌,F(xiàn)ileTxnLog就是負(fù)責(zé)日志文件持久化的邏輯對象,它是TxnLog的一個實現(xiàn)類内边。它會通過在分配內(nèi)存時會預(yù)分配固定大的內(nèi)存大胁;同時保證每次寫的時候都是直接追加順序?qū)懭爰俨校瑥亩WC日志文件的性能缭贡。所以下面我們就開始來了解下FileTxnLog。
首先我們來看下TxnLog它給我們哪些方法:
void rollLog() throws IOException;// 滾動日志辉懒,從當(dāng)前日志滾到下一個日志阳惹,不是回滾
boolean append(TxnHeader hdr, Record r) throws IOException;//追加一個請求至事務(wù)性日志
TxnIterator read(long zxid) throws IOException;// 可迭代讀取事務(wù)性日志
long getLastLoggedZxid() throws IOException;//事務(wù)性操作的最新zxid
boolean truncate(long zxid) throws IOException;// 清空zxid以后的日志
long getDbId() throws IOException;// 獲取數(shù)據(jù)庫的id
void commit() throws IOException;// 提交事務(wù)并進(jìn)行確認(rèn)
void close() throws IOException;// 關(guān)閉事務(wù)性日志
FileTxnLog是TxnLog的一個實現(xiàn)類,所以它也就負(fù)責(zé)了實現(xiàn)該接口上的方法眶俩,我們來看下它是怎么實現(xiàn)的莹汤,對于一個日志文件,特別要關(guān)注的是它的讀寫操作的性能颠印。
FileTxnLog的具體屬性
public class FileTxnLog implements TxnLog {
private static final Logger LOG;
//預(yù)分配64m大小
static long preAllocSize = 65536 * 1024;
//直接內(nèi)存
private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
//魔數(shù)纲岭,用于校驗日志文件的正確性,默認(rèn)為1514884167
public final static int TXNLOG_MAGIC =ByteBuffer.wrap("ZKLG".getBytes()).getInt();
public final static int VERSION = 2;
//日志文件名好的前綴
public static final String LOG_FILE_PREFIX = "log";
/** Maximum time we allow for elapsed fsync before WARNing */
private final static long fsyncWarningThresholdMS;
static {
LOG = LoggerFactory.getLogger(FileTxnLog.class);
//獲得系統(tǒng)參數(shù)线罕,判斷系統(tǒng)參數(shù)配置了預(yù)分配內(nèi)存大小
String size = System.getProperty("zookeeper.preAllocSize");
if (size != null) {
try {
preAllocSize = Long.parseLong(size) * 1024;
} catch (NumberFormatException e) {
LOG.warn(size + " is not a valid value for preAllocSize");
}
}
/** Local variable to read fsync.warningthresholdms into */
Long fsyncWarningThreshold;
if ((fsyncWarningThreshold = Long.getLong("zookeeper.fsync.warningthresholdms")) == null)
fsyncWarningThreshold = Long.getLong("fsync.warningthresholdms", 1000);
fsyncWarningThresholdMS = fsyncWarningThreshold;
}
// 最大(也就是最新)的zxid
long lastZxidSeen;
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;
//log目錄文件
File logDir;
//是否強(qiáng)制同步止潮,默認(rèn)是yes
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");
long dbId;
private LinkedList<FileOutputStream> streamsToFlush =
new LinkedList<FileOutputStream>();
// 當(dāng)前配置的大小
long currentSize;
// 寫日志文件
File logFileWrite = null;
private volatile long syncElapsedMS = -1L;
}
FileTxnLog方法
(1)append: 主要是負(fù)責(zé)日志追加,在對日志文件的寫操作時钞楼,zookeeper主要是通過日志追加的方法
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
//校驗頭部不能為空,hdr主要包含了czxid喇闸、clientId、zxid等相關(guān)信息
if (hdr == null) {
return false;
}
//如果待寫入的事務(wù)的事務(wù)id小于本地保存的最新的事務(wù)id,給提醒
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
//在第一次新建一個FileTxnLog時候燃乍,logStream還是空的唆樊,這個時候需要為它創(chuàng)建一個新的日志文件,并把logStream指向這個日志文件
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
//根據(jù)待寫入的事務(wù)id創(chuàng)建一個新的日志文件刻蟹,我們可以看到文件名包含這個文件存放的事務(wù)的最小事務(wù)id
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
//根據(jù)魔數(shù)逗旁、版本號和數(shù)據(jù)庫id生成日志文件頭,dbId默認(rèn)是0
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// 確保在用0填充之前,先把魔數(shù)信息等寫入到文件中舆瘪,進(jìn)行依次flush
logStream.flush();
currentSize = fos.getChannel().position();//獲取當(dāng)前文件流中的大小
streamsToFlush.add(fos);
}
//重新計算文件大小痢艺,保證文件的大小是預(yù)分配大小的整數(shù)倍
//可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷介陶;
//迅速占用磁盤空間堤舒,防止使用過程中所需空間不足
currentSize = padFile(fos.getChannel());
//序列化TxnHeader Record記錄到byte[]
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
//創(chuàng)建校驗和的算法,默認(rèn)是Adler32
//Adler-32可用于計算數(shù)據(jù)流的校驗和 校驗和哺呜,幾乎與 CRC-32 一樣可靠舌缤,但是能夠更快地計算出來
Checksum crc = makeChecksumAlgorithm();
//使用指定的字節(jié)數(shù)組更新校驗和
crc.update(buf, 0, buf.length);
//將更新的校驗和值寫入到日志文件中
oa.writeLong(crc.getValue(), "txnEntryCRC");
//將TxnHeader Record數(shù)據(jù)寫入到輸出流
Util.writeTxnBytes(oa, buf);
return true;
}
//1.先計算buf數(shù)據(jù)長度寫入
//2.寫入buf數(shù)組數(shù)據(jù)
// 3.記錄尾部以’B’字符結(jié)尾,寫入0x42
public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
throws IOException {
oa.writeBuffer(bytes, "txnEntry");
oa.writeByte((byte) 0x42, "EOR"); // 'B'
}
我們再看看下它是怎么計算和填充為文件分配的大小
private long padFile(FileChannel fileChannel) throws IOException {
//計算新的文件大小某残,并通過填充0先占用未使用的byte空間国撵,
//這樣可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷
// currentSize默認(rèn)是0
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {//將整個日志文件中未使用的部分填充0
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
/***
*
* @param position 通過管道寫入的字節(jié)長度
* @param fileSize 當(dāng)前設(shè)置的文件大小
* @param preAllocSize 與分配大小
* @return
*/
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// I如果剩余空間不足4k且預(yù)分配空間大于0
if (preAllocSize > 0 && position + 4096 >= fileSize) {
//如果已寫入的長度超過了文件大小玻墅,文件大小擴(kuò)為 寫入的字節(jié)長度+預(yù)分配的大小
if (position > fileSize){//剛創(chuàng)建的時候肯定走這個介牙,這樣就可以保證fileSize始終是preAllocSize的整數(shù)倍
fileSize = position + preAllocSize;
//這邊會重新調(diào)整到預(yù)分配塊長度的整數(shù)倍(是否是為了方便管理統(tǒng)計等?)
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
現(xiàn)在我們對FileTxnLog文件的寫應(yīng)該有一定的了解澳厢。也知道环础,在文件新建的時候會預(yù)分配文件內(nèi)存大小,并用0來填充剩拢,從而保證文件的磁盤占用是連續(xù)的线得,同時通過日志追加的方式,我們可以保證對日志文件的寫的順序性徐伐,從而保證了寫性能贯钩;我們也可以到,每次將事務(wù)寫入到日志文件中時办素,都會先根據(jù)寫入的事務(wù)計算并寫入一個校驗和角雷,然后再把事務(wù)流寫入到日志文件中,這樣可以充分保證事務(wù)日志的安全性和完整性性穿。
(2)read:看完寫文件操作勺三,我們當(dāng)然想知道讀文件的操作。因為讀寫是一一對應(yīng)的季二。文件的讀取檩咱,zookeepeer給我們提供了兩種重載的方法:
/***
* zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
* 默認(rèn)fastForward=true
*/
TxnIterator read(long zxid)
/***
* zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
* fastForward:
* true:則返回的迭代器只包含大于等于zxid的事務(wù)
* fasle:則返回的迭代器除了包含大于等于zxid的事務(wù)揭措,還包含了跟ZXID同一個日志文件的ZXID
*/
TxnIterator read(long zxid, boolean fastForward)
下面我看下read具體是怎么實現(xiàn)的
public TxnIterator read(long zxid, boolean fastForward) throws IOException {
return new FileTxnIterator(logDir, zxid, fastForward);
}
/***
* 讀取事務(wù)日志胯舷, 這個方法在服務(wù)當(dāng)機(jī)恢復(fù)的時候刻蚯,用來遍歷事務(wù)日志來恢復(fù)數(shù)據(jù)
* 根據(jù)目標(biāo)事務(wù)zxid,從日志文件中讀取大于該事務(wù)id的事務(wù)桑嘶,并返回這些事務(wù)構(gòu)成的迭代器TxnIterator
* 注意底層在遍歷每一個日志文件的時候炊汹,會對文件進(jìn)行魔數(shù)校驗等,避免文件被損壞
* @param zxid 迭代器的開始位置
* @return
* @throws IOException
*/
public FileTxnIterator(File logDir, long zxid, boolean fastForward)
throws IOException {
this.logDir = logDir;//日志文件存放目錄
this.zxid = zxid;//目標(biāo)事務(wù)ID
init();
//在init()方法里逃顶,我們拿到目標(biāo)文件的第一個事務(wù)ID讨便,這個時候如果fastForward 是true的話,就要繼續(xù)往下遍歷以政,找出目標(biāo)zxid的事務(wù)霸褒,才進(jìn)行停止。
//這里要注意hdr是上一次遍歷的事務(wù)頭
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) {
if (!next())
break;
}
}
}
void init() throws IOException {
storedFiles = new ArrayList<File>();
//排序目錄下的日志文件盈蛮,我們知道文件名稱是根據(jù)事務(wù)id來創(chuàng)建的废菱,所以,文件的排序也等價于事務(wù)的排序
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
for (File f: files) {
//找出起始事務(wù)ID大于ZXID的日志文件
if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
storedFiles.add(f);
}
//當(dāng)?shù)谝淮伪闅v到起始ID小于ZXID的日志文件后抖誉,要記得把該文件也作為查找目標(biāo)文件殊轴,因為它里面可能包含大于ZXID的事務(wù)。
//同時停止遍歷袒炉,因為后面繼續(xù)遍歷下去也沒意思旁理,都是小于ZXID的日志文件。
else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
storedFiles.add(f);
break;
}
}
//找出已排好序且可能存在大于ZXID的日志文件后我磁,打開第一個日志文件輸入流準(zhǔn)備讀取
goToNextLog();
//注意這個時候調(diào)用next()只是獲取第一個日志文件中的第一個事務(wù)ID孽文,該事務(wù)ID并不一定是ZXID。
next();
}
//開始讀取下一個事務(wù)
public boolean next() throws IOException {
if (ia == null) {
return false;
}
try {
//讀取校驗和的值
long crcValue = ia.readLong("crcvalue");
//讀取事務(wù)
byte[] bytes = Util.readTxnBytes(ia);
//因為我們是采用預(yù)分配內(nèi)存方式夺艰,會定義一個EOF作為空的事務(wù)叛溢。所以,當(dāng)我們讀取到一個空的劲适,也就表明日志文件已讀到末尾
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
}
//分析校驗和的值是否正確楷掉,防止消息被破壞。這就是為什么我們在append的時候要加入校驗和
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
//反序列事務(wù)
hdr = new TxnHeader();
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF exception " + e);
inputStream.close();
inputStream = null;
ia = null;
hdr = null;
//日志文件已讀到末尾了霞势,所以要跳到下一個文件開始讀取
if (!goToNextLog()) {
return false;
}
// if we went to the next log file, we should call next() again
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
return true;
}
FileTxnLog除了提供對事務(wù)日志的讀寫之外烹植,還提供了其它的一些額外方法,下面我們繼續(xù)看些這些方法
getLogFiles:獲取可能包含比目標(biāo)事務(wù)ID大的日志文件的數(shù)組(服務(wù)器啟動并恢復(fù)內(nèi)存數(shù)據(jù)庫的時候會調(diào)用這個方法進(jìn)行內(nèi)存數(shù)據(jù)庫恢復(fù))
/***
*
* @param logDirList 日志文件列表
* @param snapshotZxid 通過內(nèi)存快照恢復(fù)的最大的事務(wù)id愕贡,剩余的比snapshotZxid就要從日志文件里恢復(fù)
* @return 找出比包含有<=snapshotZxid的事務(wù)id的日志文件列表,當(dāng)snapshotZxid=0時草雕,獲取所有的文件
*/
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
//對日志文件進(jìn)行排序,按照事務(wù)ID從高到低
List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
long logZxid = 0;
// Find the log file that starts before or at the same time as the
// zxid of the snapshot
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
//如果文件名的事務(wù)id:fzxid>快照的最新的zxid
if (fzxid > snapshotZxid) {
continue;
}
//如果fzxid <= snapshotZxid && fzxid > logZxid
if (fzxid > logZxid) {
logZxid = fzxid;
}
}
List<File> v=new ArrayList<File>(5);
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
if (fzxid < logZxid) {//找出文件id大于logZxid的文件名
continue;
}
v.add(f);
}
return v.toArray(new File[0]);
}
getLastLoggedZxid獲取最新的事務(wù)ID
//該方法只在節(jié)點服務(wù)器啟動的時候被調(diào)用
public long getLastLoggedZxid() {//從日志文件中獲取最大的Zxid
//找出所有的日志文件并排序(其實可以排序后拿第一個就好了肮桃浴墩虹?)
File[] files = getLogFiles(logDir.listFiles(), 0);
//排序日志文件嘱巾,并從日志文件名稱上獲取包含最大zxid的日志文件的文件名中的日志id
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;
// 在最新的日志文件里迭代查找最新的事務(wù)ID
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
//根據(jù)文件名的事務(wù)id遍歷迭代該日志文件,獲取整個內(nèi)存數(shù)據(jù)庫的最大事務(wù)id,
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}
commit
commit方法诫钓,提交日志并刷至磁盤旬昭,force方法會把所有未寫磁盤的數(shù)據(jù)都強(qiáng)制寫入磁盤。 這是因為在操作系統(tǒng)中出于性能考慮回把數(shù)據(jù)放入緩沖區(qū)菌湃,所以不能保證數(shù)據(jù)在調(diào)用write寫入文件通道后就及時寫到磁盤上了问拘,除非手動調(diào)用force方法。 force方法需要一個布爾參數(shù)惧所,代表是否把meta data也一并強(qiáng)制寫入骤坐。
/***
* 提交事務(wù),確保日志刷新到磁盤中
* @throws IOException
*/
public synchronized void commit() throws IOException {
if (logStream != null) {//刷新
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {//是否強(qiáng)制刷盤
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "File size is " + channel.size() + " bytes. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
truncate:截斷刪除比zxid大的事務(wù)
truncate清空大于給定的zxid事務(wù)日志下愈,集群版learner向leader同步的時候纽绍,leader告訴learner需要回滾同步調(diào)用Learner#syncWithLeader
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
//找出大于zxid的事務(wù)迭代器
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
if(input == null) {
throw new IOException("No log files found to truncate! This could " +
"happen if you still have snapshots from an old setup or " +
"log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
}
long pos = input.getPosition();
RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
raf.setLength(pos);//把當(dāng)前l(fā)og后面的部分(比zxid更大的)截斷
raf.close();
while(itr.goToNextLog()) {
if (!itr.logFile.delete()) {//把后面的log文件都刪除
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
close(itr);
}
return true;
}
FileTxnIterator
在這里我們發(fā)現(xiàn)在根據(jù)zxid進(jìn)行read的時候會返回一個FileTxnIterator,所以我么有必要介紹這個FileTxnIterator
我們看下這個類有哪些屬性
public static class FileTxnIterator implements TxnLog.TxnIterator {
File logDir;//日志文件存放目錄
//開始讀取的起始zxid
long zxid;//迭代器的開始zxid势似,也就是這個迭代器主要是用來存放比我們要查找的zxid大的事務(wù)
TxnHeader hdr;//事務(wù)頭
Record record;
File logFile;//當(dāng)前流指向的文件
InputArchive ia;
static final String CRC_ERROR="CRC check failed";
PositionInputStream inputStream=null;
//存放包含比我們需要查找的zxid大的事務(wù)id的日志文件列表
private ArrayList<File> storedFiles;
}
構(gòu)造函數(shù)
public FileTxnIterator(File logDir, long zxid, boolean fastForward)
throws IOException {
this.logDir = logDir;
this.zxid = zxid;
//過濾出所有需要讀的日志文件拌夏,并利用goToNextLog()方法打開第一個日志日志文件的輸入流
init();
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) {
if (!next())
break;
}
}
}
init
init方法中過濾出所有需要讀的日志文件
void init() throws IOException {
//storedFiles按照事務(wù)id從大到小排序
storedFiles = new ArrayList<File>();
//排序日志文件
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
for (File f: files) {//迭代日志文件并找出可能存在事務(wù)id大于zxid的日志文件
if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
storedFiles.add(f);
}
// 當(dāng)執(zhí)行到這步,說明后面的日志都比給定的zxid小叫编,就沒必要繼續(xù)遍歷辖佣,直接break
else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
storedFiles.add(f);
break;
}
}
//獲打開第一個日志日志文件的輸入流,也就是zxid最小的
goToNextLog();
//next方法用來從日志文件中讀取一條記錄搓逾,校驗并反序列化出來卷谈,
//讀取成功返回true,如果讀到了文件末尾則調(diào)goToNextLog()讀下一個文件霞篡,以此遞歸直到最后
next();
}
goToNextLog
//打開第一個日志文件輸入流
private boolean goToNextLog() throws IOException {
if (storedFiles.size() > 0) {
this.logFile = storedFiles.remove(storedFiles.size()-1);
ia = createInputArchive(this.logFile);
return true;
}
return false;
}
next
next方法用來從日志文件中讀取一條記錄世蔗,校驗并反序列化出來,讀取成功返回true朗兵,如果讀到了文件末尾調(diào)goToNextLog()讀下一個文件污淋,以此遞歸直到最后
//讀取下一個事務(wù),并檢查事務(wù)的萬完整性余掖,包括事務(wù)頭信息
public boolean next() throws IOException {
if (ia == null) {
return false;
}
try {
long crcValue = ia.readLong("crcvalue");
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
}
// 檢查文件是否被破壞
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
hdr = new TxnHeader();
//反序列事務(wù)信息
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF exception " + e);
inputStream.close();
inputStream = null;
ia = null;
hdr = null;
// 執(zhí)行到這邊意味著文件已經(jīng)讀到末尾了寸爆,就要把留指向下一個文件
if (!goToNextLog()) {
return false;
}
// if we went to the next log file, we should call next() again
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
return true;
}
總結(jié)
問題
問題1、我們知道zookeeper每次生成的事務(wù)日志都帶有當(dāng)前文件的第一條事務(wù)的zxid盐欺,這有什么好處呢赁豆?
(1)它可以幫助我們快速的定位某一個事務(wù)操作所在的日志文件。
(2)我們知道冗美,事務(wù)的zxid中高32位包含了epoch魔种,這個是leader所屬的周期,因此這樣我們可以通過日志文件名就清楚的知道粉洼,當(dāng)前運行時的zookeeper所屬的leader周期节预。
問題2叶摄、在前面,我們知道安拟,每次append寫入事務(wù)的時蛤吓,我們都會檢測事務(wù)文件日志當(dāng)前剩余的空間是否大于4kb,如果小于4kb去扣,則會在現(xiàn)有的文件基礎(chǔ)上加上64MB柱衔,然后使用0來填充樊破?那么為什么要使用這種預(yù)分配的形式呢愉棱?
我們都知道,對于客戶端每次的事務(wù)提交哲戚,都要將事務(wù)寫入到事務(wù)日志中奔滑,所以事務(wù)日志寫入的性能決定了zookeeper對客戶端的請求的響應(yīng)。也就是說顺少,事務(wù)每次的請求可以看作是一次對底層磁盤的IO操作朋其。嚴(yán)格的講,文件的不斷追加寫入操作會觸發(fā)底層磁盤IO為文件不斷的開辟新的磁盤塊脆炎,即磁盤seek梅猿。因此為了減少seek的頻率,從而提高zookeeper的IO響應(yīng)的時間秒裕,創(chuàng)建事務(wù)日志的時候都會進(jìn)行文件的預(yù)分配--在文件處建之時袱蚓,就會向操作系統(tǒng)預(yù)分配一塊很大的磁盤塊,默認(rèn)是64mb几蜻,而一旦分配的磁盤塊剩余的空間<4kb喇潘,則會再次分配,這樣就可以避免隨著每次事務(wù)的寫入過程中導(dǎo)致日志文件的不斷增長而需要不斷的觸發(fā)seek梭稚。事務(wù)預(yù)分配的大小颖低,可以通過系統(tǒng)參數(shù)zookeeper.preAllocsize來設(shè)置。
問題3弧烤、事務(wù)日志文件是如何檢查一個事務(wù)日志文件的完整性呢忱屑?
事務(wù)日志文件為了保證和檢查其文件的完整性和數(shù)據(jù)的準(zhǔn)確性。zookeeper在每次事務(wù)操作寫入前暇昂,都會根據(jù)系列化的字節(jié)數(shù)組來計算checksum莺戒,這樣當(dāng)我們重新載入事務(wù)的時候,就可以檢查這個事務(wù)文件的完整性了话浇。zookeeper采用Adler32算法來計算checksum脏毯。
問題4沪蓬、事務(wù)是什么時候刷盤的水评?
我們剛才講過,事務(wù)每次刷盤都是一次IO操作,所以為了減少刷盤的次數(shù)另玖,從而提高響應(yīng)性能,zookeeper會將每次事務(wù)的請求寫入都是先寫到一個緩沖流中其掂,而并非真正的刷盤到磁盤上去缤底,那么在什么時候輸盤到磁盤中呢?zookeeper服務(wù)器在啟動的時候會單獨啟動一個requestProcessor線程來處理這個請求隊列queuedRequests自娩,如果隊列里面有待處理的事務(wù)請求用踩,則該線程將會取出隊列事務(wù)并寫入到事務(wù)日志文件中,這個時候的寫入是先寫入到一個緩沖流中忙迁,當(dāng)requestProcessor統(tǒng)計寫入緩沖流的事務(wù)超過1000或者隊列已經(jīng)沒有事務(wù)了脐彩,則會開始將緩沖流中的數(shù)據(jù)刷到磁盤塊中。至于刷盤的方式是可選擇的姊扔,通過配置控制它是異步還是同步刷到磁盤中惠奸。
問題5、事務(wù)日志的截斷方法什么請下會觸發(fā)恰梢?
由于在zookeeper運行中佛南,可能由于一些異常情況會導(dǎo)致learner的lastzxid比leader的還大,無論這種情況是怎么發(fā)生的嵌言,這都是一種不正常的現(xiàn)象嗅回。為了遵循一個集群中,只要存在leader摧茴,那么所有機(jī)器都必須與該leader的數(shù)據(jù)進(jìn)行同步绵载,所以leader會向learner觸發(fā)truc方法,要求這個leaner對日志進(jìn)行截斷蓬蝶。