數(shù)據(jù)分類
整體分為 3 類:
內(nèi)存數(shù)據(jù)
-
磁盤數(shù)據(jù): 磁盤數(shù)據(jù)又分為
1. 快照 2. 事務(wù)日志
在zk啟動過程中抑堡,3類數(shù)據(jù)之間的關(guān)系為:
內(nèi)存數(shù)據(jù)
ZK 的數(shù)據(jù)模型:樹
樹中單個節(jié)點(diǎn)包含的內(nèi)容:
節(jié)點(diǎn)數(shù)據(jù)
節(jié)點(diǎn) ACL 信息
節(jié)點(diǎn)的路徑
在Zookeeper中,數(shù)據(jù)存儲分為兩部分:內(nèi)存數(shù)據(jù)存儲和磁盤數(shù)據(jù)存儲,我們主要分析服務(wù)器啟動時內(nèi)存數(shù)據(jù)庫的初始化過程和主從服務(wù)器數(shù)據(jù)同步的過程,先介紹一下涉及的基本類
DataTree
Zookeeper的數(shù)據(jù)模型是一棵樹簇爆,DataTree是內(nèi)存數(shù)據(jù)存儲的核心在孝,代表了內(nèi)存中一份完整的數(shù)據(jù)(最新),包括所有的節(jié)點(diǎn)路徑浆竭,節(jié)點(diǎn)數(shù)據(jù)和ACL信息,對應(yīng)watches等。類的主要屬性為:
//節(jié)點(diǎn)路徑為key,節(jié)點(diǎn)數(shù)據(jù)內(nèi)容DataNode為value.實時存儲了所有的zk節(jié)點(diǎn)邦泄,使用ConcurrentHashMap保證并發(fā)性
private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();
//節(jié)點(diǎn)數(shù)據(jù)對應(yīng)的watch
private final WatchManager dataWatches = new WatchManager();
//節(jié)點(diǎn)路徑對應(yīng)的watch
private final WatchManager childWatches = new WatchManager();
//key為sessionId,value為該會話對應(yīng)的臨時節(jié)點(diǎn)路徑删窒,方便實時訪問和清理
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
//This set contains the paths of all container nodes
private final Set<String> containers =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//This set contains the paths of all ttl nodes
private final Set<String> ttls =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//內(nèi)存數(shù)據(jù)庫的最大zxid
public volatile long lastProcessedZxid = 0;
DataNode
數(shù)據(jù)存儲的最小單元,包含節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容顺囊,節(jié)點(diǎn)狀態(tài)肌索,子節(jié)點(diǎn)列表,以及對子節(jié)點(diǎn)的操作接口等特碳,主要屬性為:
//節(jié)點(diǎn)內(nèi)容
byte data[];
Long acl;
//節(jié)點(diǎn)狀態(tài)诚亚,包括一些節(jié)點(diǎn)的元數(shù)據(jù),如ephemeralOwner午乓,czxid等
public StatPersisted stat;
//子節(jié)點(diǎn)相對父節(jié)點(diǎn)路徑集合站宗,不包括父節(jié)點(diǎn)路徑
private Set<String> children = null;
拋出 2 個問題:
1. DataTree 中 nodes 是 Map,表示所有的 ZK 節(jié)點(diǎn)益愈,那其內(nèi)部 key 是什么梢灭?
Re:ZNode 的唯一標(biāo)識 path 作為 key
2. ephemerals 是Map,用于存儲臨時節(jié)點(diǎn)腕唧,那其內(nèi)部 key 是什么或辖?value 又是什么?
Re:臨時節(jié)點(diǎn)是跟 Session 綁定的枣接,sessionId 作為 key
ZKDatabase
Zookeeper的內(nèi)存數(shù)據(jù)庫颂暇,負(fù)責(zé)管理Zookeeper的所有會話,DataTree存儲和事務(wù)日志但惶。它會定時向磁盤dump快照數(shù)據(jù)(snapCount主要控制)耳鸯,服務(wù)器啟動時,會通過磁盤上的事務(wù)日志和快照數(shù)據(jù)文件恢復(fù)成完整的內(nèi)存數(shù)據(jù)庫膀曾。主要屬性為:
protected DataTree dataTree;
//key為sessionId,value為會話過期時間
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//用于和磁盤交互事務(wù)日志文件和快照文件的類
protected FileTxnSnapLog snapLog;
//主從數(shù)據(jù)同步時使用
protected long minCommittedLog, maxCommittedLog;
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
zookeeper內(nèi)存數(shù)據(jù)庫的兩種持久化方式:
FileTxnLog:事務(wù)日志文件县爬,以日志追加的方式維護(hù)。
FileSnap:內(nèi)存快照文件添谊,保存內(nèi)存數(shù)據(jù)庫某一時刻的狀態(tài)财喳,所以數(shù)據(jù)不一定是最新的。
TxnLog:是一個讀取日志的接口斩狱,提供了讀取事務(wù)log的接口方法耳高。
SnapShot:是一個操作日志快照的接口,提供了對快照文件操作的方法所踊。
FileTxnLog:實現(xiàn)TxnLog接口泌枪,提供了讀取事務(wù)日志的方法實現(xiàn)。
FileSnap:實現(xiàn)Snapshot接口秕岛,負(fù)責(zé)存儲碌燕、序列化误证、反序列化、訪問快照修壕。
FileTxnSnapLog愈捅,封裝了TxnLog和SnapShot。
Util叠殷,工具類改鲫,提供持久化所需的API。
我們可以看到林束,zookeeper總共提供了兩種持久化文件,分別是內(nèi)存快照SnapShot和事務(wù)日志TxnLog(這種日志有點(diǎn)類似MySQL數(shù)據(jù)庫中的binlog稽亏,zookeeper會把所有涉及到修改內(nèi)存數(shù)據(jù)結(jié)構(gòu)的操作日志記錄到該log中壶冒,也就是說,zookeeper會把每一個事務(wù)操作諸如添加截歉、刪除都會記錄到這個日志文件中胖腾,當(dāng)zookeeper出現(xiàn)異常時,可以借助該事務(wù)日志進(jìn)行數(shù)據(jù)恢復(fù))瘪松。
日志文件它主要是負(fù)責(zé)實時記錄對服務(wù)端的每一次的事務(wù)操作日志(這里講的事務(wù)和數(shù)據(jù)庫中事務(wù)不一樣咸作,它是指涉及到對服務(wù)器端的內(nèi)存數(shù)據(jù)庫的增刪改這種會變更內(nèi)存數(shù)據(jù)庫的操作行為,不記錄查詢)
通過這種日志文件宵睦,當(dāng)zookeeper因為故障而發(fā)生重啟時记罚,我們就可以根據(jù)內(nèi)存快照文件和事務(wù)日志使得內(nèi)存數(shù)據(jù)庫恢復(fù)最新的數(shù)據(jù)庫狀態(tài)。在zookeeper中壳嚎,F(xiàn)ileTxnLog就是負(fù)責(zé)日志文件持久化的邏輯對象桐智,它是TxnLog的一個實現(xiàn)類。它會通過在分配內(nèi)存時會預(yù)分配固定大的內(nèi)存大醒滔凇说庭;同時保證每次寫的時候都是直接追加順序?qū)懭耄瑥亩WC日志文件的性能郑趁。
首先我們來看下TxnLog它給我們哪些方法:
void rollLog() throws IOException;// 滾動日志刊驴,從當(dāng)前日志滾到下一個日志,不是回滾
boolean append(TxnHeader hdr, Record r) throws IOException;//追加一個請求至事務(wù)性日志
TxnIterator read(long zxid) throws IOException;// 可迭代讀取事務(wù)性日志
long getLastLoggedZxid() throws IOException;//事務(wù)性操作的最新zxid
boolean truncate(long zxid) throws IOException;// 清空zxid以后的日志
long getDbId() throws IOException;// 獲取數(shù)據(jù)庫的id
void commit() throws IOException;// 提交事務(wù)并進(jìn)行確認(rèn)
void close() throws IOException;// 關(guān)閉事務(wù)性日志
FileTxnLog是TxnLog的一個實現(xiàn)類寡润,所以它也就負(fù)責(zé)了實現(xiàn)該接口上的方法捆憎,我們來看下它是怎么實現(xiàn)的,對于一個日志文件悦穿,特別要關(guān)注的是它的讀寫操作的性能攻礼。
FileTxnLog
實現(xiàn)了TxnLog接口,提供了API可以獲取日志和寫入日志栗柒,
首先先看一下事務(wù)日志文件的格式
LogFile:
//一個日志文件由以下三個部分組成
* FileHeader TxnList ZeroPad
//1.文件頭
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
//事務(wù)內(nèi)容
* TxnList:
* Txn || Txn TxnList
* Txn:
//一條事務(wù)日志的組成部分
* checksum Txnlen TxnHeader Record 0x42
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
* len 4bytes
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
* Record:
* See Jute definition file for details on the various record types
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
public class FileTxnLog implements TxnLog {
private static final Logger LOG;
//預(yù)分配64m大小
static long preAllocSize = 65536 * 1024;
//直接內(nèi)存
private static final ByteBuffer fill = ByteBuffer.allocateDirect(1);
//魔數(shù)礁扮,用于校驗日志文件的正確性知举,默認(rèn)為1514884167
public final static int TXNLOG_MAGIC =ByteBuffer.wrap("ZKLG".getBytes()).getInt();
public final static int VERSION = 2;
//日志文件名好的前綴
public static final String LOG_FILE_PREFIX = "log";
/** Maximum time we allow for elapsed fsync before WARNing */
private final static long fsyncWarningThresholdMS;
static {
LOG = LoggerFactory.getLogger(FileTxnLog.class);
//獲得系統(tǒng)參數(shù),判斷系統(tǒng)參數(shù)配置了預(yù)分配內(nèi)存大小
String size = System.getProperty("zookeeper.preAllocSize");
if (size != null) {
try {
preAllocSize = Long.parseLong(size) * 1024;
} catch (NumberFormatException e) {
LOG.warn(size + " is not a valid value for preAllocSize");
}
}
/** Local variable to read fsync.warningthresholdms into */
Long fsyncWarningThreshold;
if ((fsyncWarningThreshold = Long.getLong("zookeeper.fsync.warningthresholdms")) == null)
fsyncWarningThreshold = Long.getLong("fsync.warningthresholdms", 1000);
fsyncWarningThresholdMS = fsyncWarningThreshold;
}
// 最大(也就是最新)的zxid
long lastZxidSeen;
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null;
//log目錄文件
File logDir;
//是否強(qiáng)制同步太伊,默認(rèn)是yes
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");
long dbId;
private LinkedList<FileOutputStream> streamsToFlush =
new LinkedList<FileOutputStream>();
// 當(dāng)前配置的大小
long currentSize;
// 寫日志文件
File logFileWrite = null;
private volatile long syncElapsedMS = -1L;
}
FileTxnLog方法
append: 主要是負(fù)責(zé)日志追加雇锡,在對日志文件的寫操作時,zookeeper主要是通過日志追加的方法
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
//校驗頭部不能為空,hdr主要包含了czxid僚焦、clientId锰提、zxid等相關(guān)信息
if (hdr == null) {
return false;
}
//如果待寫入的事務(wù)的事務(wù)id小于本地保存的最新的事務(wù)id,給提醒
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
//在第一次新建一個FileTxnLog時候芳悲,logStream還是空的立肘,這個時候需要為它創(chuàng)建一個新的日志文件,并把logStream指向這個日志文件
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
//根據(jù)待寫入的事務(wù)id創(chuàng)建一個新的日志文件名扛,我們可以看到文件名包含這個文件存放的事務(wù)的最小事務(wù)id
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
//根據(jù)魔數(shù)谅年、版本號和數(shù)據(jù)庫id生成日志文件頭,dbId默認(rèn)是0
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// 確保在用0填充之前,先把魔數(shù)信息等寫入到文件中肮韧,進(jìn)行依次flush
logStream.flush();
currentSize = fos.getChannel().position();//獲取當(dāng)前文件流中的大小
streamsToFlush.add(fos);
}
//重新計算文件大小融蹂,保證文件的大小是預(yù)分配大小的整數(shù)倍
//可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷弄企;
//迅速占用磁盤空間超燃,防止使用過程中所需空間不足
currentSize = padFile(fos.getChannel());
//序列化TxnHeader Record記錄到byte[]
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
//創(chuàng)建校驗和的算法,默認(rèn)是Adler32
//Adler-32可用于計算數(shù)據(jù)流的校驗和 校驗和拘领,幾乎與 CRC-32 一樣可靠意乓,但是能夠更快地計算出來
Checksum crc = makeChecksumAlgorithm();
//使用指定的字節(jié)數(shù)組更新校驗和
crc.update(buf, 0, buf.length);
//將更新的校驗和值寫入到日志文件中
oa.writeLong(crc.getValue(), "txnEntryCRC");
//將TxnHeader Record數(shù)據(jù)寫入到輸出流
Util.writeTxnBytes(oa, buf);
return true;
}
//1.先計算buf數(shù)據(jù)長度寫入
//2.寫入buf數(shù)組數(shù)據(jù)
// 3.記錄尾部以’B’字符結(jié)尾,寫入0x42
public static void writeTxnBytes(OutputArchive oa, byte[] bytes)
throws IOException {
oa.writeBuffer(bytes, "txnEntry");
oa.writeByte((byte) 0x42, "EOR"); // 'B'
}
我們再看看下它是怎么計算和填充為文件分配的大小
private long padFile(FileChannel fileChannel) throws IOException {
//計算新的文件大小院究,并通過填充0先占用未使用的byte空間洽瞬,
//這樣可以讓文件盡可能的占用連續(xù)的磁盤扇區(qū),減少后續(xù)寫入和讀取文件時的磁盤尋道開銷
// currentSize默認(rèn)是0
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {//將整個日志文件中未使用的部分填充0
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
/***
*
* @param position 通過管道寫入的字節(jié)長度
* @param fileSize 當(dāng)前設(shè)置的文件大小
* @param preAllocSize 與分配大小
* @return
*/
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// I如果剩余空間不足4k且預(yù)分配空間大于0
if (preAllocSize > 0 && position + 4096 >= fileSize) {
//如果已寫入的長度超過了文件大小业汰,文件大小擴(kuò)為 寫入的字節(jié)長度+預(yù)分配的大小
if (position > fileSize){//剛創(chuàng)建的時候肯定走這個伙窃,這樣就可以保證fileSize始終是preAllocSize的整數(shù)倍
fileSize = position + preAllocSize;
//這邊會重新調(diào)整到預(yù)分配塊長度的整數(shù)倍(是否是為了方便管理統(tǒng)計等?)
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
現(xiàn)在我們對FileTxnLog文件的寫應(yīng)該有一定的了解样漆。也知道为障,在文件新建的時候會預(yù)分配文件內(nèi)存大小,并用0來填充放祟,從而保證文件的磁盤占用是連續(xù)的鳍怨,同時通過日志追加的方式,我們可以保證對日志文件的寫的順序性跪妥,從而保證了寫性能鞋喇;我們也可以到,每次將事務(wù)寫入到日志文件中時眉撵,都會先根據(jù)寫入的事務(wù)計算并寫入一個校驗和侦香,然后再把事務(wù)流寫入到日志文件中落塑,這樣可以充分保證事務(wù)日志的安全性和完整性。
read:看完寫文件操作罐韩,我們當(dāng)然想知道讀文件的操作憾赁。因為讀寫是一一對應(yīng)的。文件的讀取散吵,zookeepeer給我們提供了兩種重載的方法:
/***
* zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
* 默認(rèn)fastForward=true
*/
TxnIterator read(long zxid)
/***
* zxid:指定迭代讀取日志文件中的第一個事務(wù)ID
* fastForward:
* true:則返回的迭代器只包含大于等于zxid的事務(wù)
* fasle:則返回的迭代器除了包含大于等于zxid的事務(wù)龙考,還包含了跟ZXID同一個日志文件的ZXID
*/
TxnIterator read(long zxid, boolean fastForward)
public TxnIterator read(long zxid, boolean fastForward) throws IOException {
return new FileTxnIterator(logDir, zxid, fastForward);
}
/***
* 讀取事務(wù)日志, 這個方法在服務(wù)當(dāng)機(jī)恢復(fù)的時候矾睦,用來遍歷事務(wù)日志來恢復(fù)數(shù)據(jù)
* 根據(jù)目標(biāo)事務(wù)zxid晦款,從日志文件中讀取大于該事務(wù)id的事務(wù),并返回這些事務(wù)構(gòu)成的迭代器TxnIterator
* 注意底層在遍歷每一個日志文件的時候顷锰,會對文件進(jìn)行魔數(shù)校驗等柬赐,避免文件被損壞
* @param zxid 迭代器的開始位置
* @return
* @throws IOException
*/
public FileTxnIterator(File logDir, long zxid, boolean fastForward)
throws IOException {
this.logDir = logDir;//日志文件存放目錄
this.zxid = zxid;//目標(biāo)事務(wù)ID
init();
//在init()方法里,我們拿到目標(biāo)文件的第一個事務(wù)ID官紫,這個時候如果fastForward 是true的話,就要繼續(xù)往下遍歷州藕,找出目標(biāo)zxid的事務(wù)束世,才進(jìn)行停止。
//這里要注意hdr是上一次遍歷的事務(wù)頭
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) {
if (!next())
break;
}
}
}
void init() throws IOException {
storedFiles = new ArrayList<File>();
//排序目錄下的日志文件床玻,我們知道文件名稱是根據(jù)事務(wù)id來創(chuàng)建的毁涉,所以,文件的排序也等價于事務(wù)的排序
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
for (File f: files) {
//找出起始事務(wù)ID大于ZXID的日志文件
if (Util.getZxidFromName(f.getName(), "log") >= zxid) {
storedFiles.add(f);
}
//當(dāng)?shù)谝淮伪闅v到起始ID小于ZXID的日志文件后锈死,要記得把該文件也作為查找目標(biāo)文件贫堰,因為它里面可能包含大于ZXID的事務(wù)。
//同時停止遍歷待牵,因為后面繼續(xù)遍歷下去也沒意思其屏,都是小于ZXID的日志文件。
else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
storedFiles.add(f);
break;
}
}
//找出已排好序且可能存在大于ZXID的日志文件后缨该,打開第一個日志文件輸入流準(zhǔn)備讀取
goToNextLog();
//注意這個時候調(diào)用next()只是獲取第一個日志文件中的第一個事務(wù)ID偎行,該事務(wù)ID并不一定是ZXID。
next();
}
//開始讀取下一個事務(wù)
public boolean next() throws IOException {
if (ia == null) {
return false;
}
try {
//讀取校驗和的值
long crcValue = ia.readLong("crcvalue");
//讀取事務(wù)
byte[] bytes = Util.readTxnBytes(ia);
//因為我們是采用預(yù)分配內(nèi)存方式贰拿,會定義一個EOF作為空的事務(wù)蛤袒。所以,當(dāng)我們讀取到一個空的膨更,也就表明日志文件已讀到末尾
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
}
//分析校驗和的值是否正確妙真,防止消息被破壞。這就是為什么我們在append的時候要加入校驗和
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
//反序列事務(wù)
hdr = new TxnHeader();
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF exception " + e);
inputStream.close();
inputStream = null;
ia = null;
hdr = null;
//日志文件已讀到末尾了荚守,所以要跳到下一個文件開始讀取
if (!goToNextLog()) {
return false;
}
// if we went to the next log file, we should call next() again
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
return true;
}
FileTxnLog除了提供對事務(wù)日志的讀寫之外珍德,還提供了其它的一些額外方法练般,下面我們繼續(xù)看些這些方法
getLogFiles:獲取可能包含比目標(biāo)事務(wù)ID大的日志文件的數(shù)組(服務(wù)器啟動并恢復(fù)內(nèi)存數(shù)據(jù)庫的時候會調(diào)用這個方法進(jìn)行內(nèi)存數(shù)據(jù)庫恢復(fù))
/***
*
* @param logDirList 日志文件列表
* @param snapshotZxid 通過內(nèi)存快照恢復(fù)的最大的事務(wù)id,剩余的比snapshotZxid就要從日志文件里恢復(fù)
* @return 找出比包含有<=snapshotZxid的事務(wù)id的日志文件列表,當(dāng)snapshotZxid=0時菱阵,獲取所有的文件
*/
public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
//對日志文件進(jìn)行排序踢俄,按照事務(wù)ID從高到低
List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
long logZxid = 0;
// Find the log file that starts before or at the same time as the
// zxid of the snapshot
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
//如果文件名的事務(wù)id:fzxid>快照的最新的zxid
if (fzxid > snapshotZxid) {
continue;
}
//如果fzxid <= snapshotZxid && fzxid > logZxid
if (fzxid > logZxid) {
logZxid = fzxid;
}
}
List<File> v=new ArrayList<File>(5);
for (File f : files) {
long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
if (fzxid < logZxid) {//找出文件id大于logZxid的文件名
continue;
}
v.add(f);
}
return v.toArray(new File[0]);
}
getLastLoggedZxid獲取最新的事務(wù)ID
//該方法只在節(jié)點(diǎn)服務(wù)器啟動的時候被調(diào)用
public long getLastLoggedZxid() {//從日志文件中獲取最大的Zxid
//找出所有的日志文件并排序(其實可以排序后拿第一個就好了啊晴及?)
File[] files = getLogFiles(logDir.listFiles(), 0);
//排序日志文件都办,并從日志文件名稱上獲取包含最大zxid的日志文件的文件名中的日志id
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),LOG_FILE_PREFIX):-1;
// 在最新的日志文件里迭代查找最新的事務(wù)ID
long zxid = maxLog;
TxnIterator itr = null;
try {
FileTxnLog txn = new FileTxnLog(logDir);
//根據(jù)文件名的事務(wù)id遍歷迭代該日志文件,獲取整個內(nèi)存數(shù)據(jù)庫的最大事務(wù)id,
itr = txn.read(maxLog);
while (true) {
if(!itr.next())
break;
TxnHeader hdr = itr.getHeader();
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
close(itr);
}
return zxid;
}
commit方法虑稼,提交日志并刷至磁盤琳钉,force方法會把所有未寫磁盤的數(shù)據(jù)都強(qiáng)制寫入磁盤。 這是因為在操作系統(tǒng)中出于性能考慮回把數(shù)據(jù)放入緩沖區(qū)蛛倦,所以不能保證數(shù)據(jù)在調(diào)用write寫入文件通道后就及時寫到磁盤上了歌懒,除非手動調(diào)用force方法。 force方法需要一個布爾參數(shù)溯壶,代表是否把meta data也一并強(qiáng)制寫入及皂。
/***
* 提交事務(wù),確保日志刷新到磁盤中
* @throws IOException
*/
public synchronized void commit() throws IOException {
if (logStream != null) {//刷新
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) {
log.flush();
if (forceSync) {//是否強(qiáng)制刷盤
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
channel.force(false);
syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) {
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "File size is " + channel.size() + " bytes. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) {
streamsToFlush.removeFirst().close();
}
}
truncate:截斷刪除比zxid大的事務(wù)
runcate清空大于給定的zxid事務(wù)日志且改,集群版learner向leader同步的時候验烧,leader告訴learner需要回滾同步調(diào)用Learner#syncWithLeader
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
//找出大于zxid的事務(wù)迭代器
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
if(input == null) {
throw new IOException("No log files found to truncate! This could " +
"happen if you still have snapshots from an old setup or " +
"log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
}
long pos = input.getPosition();
RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
raf.setLength(pos);//把當(dāng)前l(fā)og后面的部分(比zxid更大的)截斷
raf.close();
while(itr.goToNextLog()) {
if (!itr.logFile.delete()) {//把后面的log文件都刪除
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
close(itr);
}
return true;
}
FileTxnIterator
在這里我們發(fā)現(xiàn)在根據(jù)zxid進(jìn)行read的時候會返回一個FileTxnIterator,所以有必要介紹這個FileTxnIterator
public static class FileTxnIterator implements TxnLog.TxnIterator {
File logDir;//日志文件存放目錄
//開始讀取的起始zxid
long zxid;//迭代器的開始zxid又跛,也就是這個迭代器主要是用來存放比我們要查找的zxid大的事務(wù)
TxnHeader hdr;//事務(wù)頭
Record record;
File logFile;//當(dāng)前流指向的文件
InputArchive ia;
static final String CRC_ERROR="CRC check failed";
PositionInputStream inputStream=null;
//存放包含比我們需要查找的zxid大的事務(wù)id的日志文件列表
private ArrayList<File> storedFiles;
}
構(gòu)造函數(shù)
public FileTxnIterator(File logDir, long zxid, boolean fastForward)
throws IOException {
this.logDir = logDir;
this.zxid = zxid;
//過濾出所有需要讀的日志文件碍拆,并利用goToNextLog()方法打開第一個日志日志文件的輸入流
init();
if (fastForward && hdr != null) {
while (hdr.getZxid() < zxid) {
if (!next())
break;
}
}
}
init方法中過濾出所有需要讀的日志文件
void init() throws IOException {
//storedFiles按照事務(wù)id從大到小排序
storedFiles = new ArrayList<File>();
//排序日志文件
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
for (File f: files) {//迭代日志文件并找出可能存在事務(wù)id大于zxid的日志文件
if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
storedFiles.add(f);
}
// 當(dāng)執(zhí)行到這步,說明后面的日志都比給定的zxid小慨蓝,就沒必要繼續(xù)遍歷感混,直接break
else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
storedFiles.add(f);
break;
}
}
//獲打開第一個日志日志文件的輸入流,也就是zxid最小的
goToNextLog();
//next方法用來從日志文件中讀取一條記錄礼烈,校驗并反序列化出來弧满,
//讀取成功返回true,如果讀到了文件末尾則調(diào)goToNextLog()讀下一個文件济丘,以此遞歸直到最后
next();
}
goToNextLog
//打開第一個日志文件輸入流
private boolean goToNextLog() throws IOException {
if (storedFiles.size() > 0) {
this.logFile = storedFiles.remove(storedFiles.size()-1);
ia = createInputArchive(this.logFile);
return true;
}
return false;
}
next
next方法用來從日志文件中讀取一條記錄谱秽,校驗并反序列化出來,讀取成功返回true摹迷,如果讀到了文件末尾調(diào)goToNextLog()讀下一個文件疟赊,以此遞歸直到最后
//讀取下一個事務(wù),并檢查事務(wù)的萬完整性峡碉,包括事務(wù)頭信息
public boolean next() throws IOException {
if (ia == null) {
return false;
}
try {
long crcValue = ia.readLong("crcvalue");
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
if (bytes == null || bytes.length==0) {
throw new EOFException("Failed to read " + logFile);
}
// 檢查文件是否被破壞
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue())
throw new IOException(CRC_ERROR);
hdr = new TxnHeader();
//反序列事務(wù)信息
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) {
LOG.debug("EOF exception " + e);
inputStream.close();
inputStream = null;
ia = null;
hdr = null;
// 執(zhí)行到這邊意味著文件已經(jīng)讀到末尾了近哟,就要把留指向下一個文件
if (!goToNextLog()) {
return false;
}
// if we went to the next log file, we should call next() again
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
return true;
}
事務(wù)日志小結(jié):
事務(wù)日志頻繁 flush 到磁盤,消耗大量磁盤 IO
磁盤空間預(yù)分配:事務(wù)日志剩余空間 < 4KB 時鲫寄,將文件大小增加 64 MB
磁盤預(yù)分配的目標(biāo):減少磁盤 seek 次數(shù)
建議:事務(wù)日志吉执,采用獨(dú)立磁盤單獨(dú)存放
事務(wù)序列化:本質(zhì)是生成一個字節(jié)數(shù)組
包含:事務(wù)頭疯淫、事務(wù)體的序列化
事務(wù)體:會話創(chuàng)建事務(wù)、節(jié)點(diǎn)創(chuàng)建事務(wù)戳玫、節(jié)點(diǎn)刪除事務(wù)熙掺、節(jié)點(diǎn)數(shù)據(jù)更新事務(wù)
數(shù)據(jù)相關(guān)過程
ZK 服務(wù)器啟動時,首先會進(jìn)行數(shù)據(jù)初始化咕宿,將磁盤中數(shù)據(jù)币绩,加載到內(nèi)存中,恢復(fù)現(xiàn)場府阀。
數(shù)據(jù)同步
ZK 集群服務(wù)器啟動之后缆镣,會進(jìn)行 2 個動作:
- 選舉 Leader:分配角色
- Learner 向 Leader 服務(wù)器注冊:數(shù)據(jù)同步
數(shù)據(jù)同步,本質(zhì):將沒有在 Learner 上執(zhí)行的事務(wù)试浙,同步給 Learner董瞻。
集群啟動后,什么時候能夠?qū)ν馓峁┓?wù)田巴?需要等所有 Learner 都完成數(shù)據(jù)同步嗎钠糊?
過半策略:只需要半數(shù) Learner 完成數(shù)據(jù)同步,Learder 向所有已經(jīng)完成數(shù)據(jù)同步的 Learner 發(fā)送 UPTODATE 命令壹哺,表示集群具備了對外服務(wù)能力
FREQ
問題1眠蚂、我們知道zookeeper每次生成的事務(wù)日志都帶有當(dāng)前文件的第一條事務(wù)的zxid,這有什么好處呢斗躏?
(1)它可以幫助我們快速的定位某一個事務(wù)操作所在的日志文件。
(2)我們知道昔脯,事務(wù)的zxid中高32位包含了epoch啄糙,這個是leader所屬的周期,因此這樣我們可以通過日志文件名就清楚的知道云稚,當(dāng)前運(yùn)行時的zookeeper所屬的leader周期隧饼。
問題2、在前面静陈,我們知道燕雁,每次append寫入事務(wù)的時,我們都會檢測事務(wù)文件日志當(dāng)前剩余的空間是否大于4kb鲸拥,如果小于4kb拐格,則會在現(xiàn)有的文件基礎(chǔ)上加上64MB,然后使用0來填充刑赶?那么為什么要使用這種預(yù)分配的形式呢捏浊?
我們都知道,對于客戶端每次的事務(wù)提交撞叨,都要將事務(wù)寫入到事務(wù)日志中金踪,所以事務(wù)日志寫入的性能決定了zookeeper對客戶端的請求的響應(yīng)浊洞。也就是說,事務(wù)每次的請求可以看作是一次對底層磁盤的IO操作胡岔。嚴(yán)格的講法希,文件的不斷追加寫入操作會觸發(fā)底層磁盤IO為文件不斷的開辟新的磁盤塊,即磁盤seek靶瘸。因此為了減少seek的頻率苫亦,從而提高zookeeper的IO響應(yīng)的時間,創(chuàng)建事務(wù)日志的時候都會進(jìn)行文件的預(yù)分配--在文件處建之時奕锌,就會向操作系統(tǒng)預(yù)分配一塊很大的磁盤塊著觉,默認(rèn)是64mb,而一旦分配的磁盤塊剩余的空間<4kb惊暴,則會再次分配饼丘,這樣就可以避免隨著每次事務(wù)的寫入過程中導(dǎo)致日志文件的不斷增長而需要不斷的觸發(fā)seek。事務(wù)預(yù)分配的大小辽话,可以通過系統(tǒng)參數(shù)zookeeper.preAllocsize來設(shè)置肄鸽。
問題3、事務(wù)日志文件是如何檢查一個事務(wù)日志文件的完整性呢油啤?
事務(wù)日志文件為了保證和檢查其文件的完整性和數(shù)據(jù)的準(zhǔn)確性典徘。zookeeper在每次事務(wù)操作寫入前,都會根據(jù)系列化的字節(jié)數(shù)組來計算checksum益咬,這樣當(dāng)我們重新載入事務(wù)的時候逮诲,就可以檢查這個事務(wù)文件的完整性了。zookeeper采用Adler32算法來計算checksum幽告。
問題4梅鹦、事務(wù)是什么時候刷盤的?
我們剛才講過冗锁,事務(wù)每次刷盤都是一次IO操作齐唆,所以為了減少刷盤的次數(shù),從而提高響應(yīng)性能冻河,zookeeper會將每次事務(wù)的請求寫入都是先寫到一個緩沖流中箍邮,而并非真正的刷盤到磁盤上去,那么在什么時候輸盤到磁盤中呢叨叙?zookeeper服務(wù)器在啟動的時候會單獨(dú)啟動一個requestProcessor線程來處理這個請求隊列queuedRequests锭弊,如果隊列里面有待處理的事務(wù)請求,則該線程將會取出隊列事務(wù)并寫入到事務(wù)日志文件中摔敛,這個時候的寫入是先寫入到一個緩沖流中廷蓉,當(dāng)requestProcessor統(tǒng)計寫入緩沖流的事務(wù)超過1000或者隊列已經(jīng)沒有事務(wù)了,則會開始將緩沖流中的數(shù)據(jù)刷到磁盤塊中。至于刷盤的方式是可選擇的桃犬,通過配置控制它是異步還是同步刷到磁盤中刹悴。
問題5、事務(wù)日志的截斷方法什么請下會觸發(fā)攒暇?
由于在zookeeper運(yùn)行中土匀,可能由于一些異常情況會導(dǎo)致learner的lastzxid比leader的還大,無論這種情況是怎么發(fā)生的形用,這都是一種不正常的現(xiàn)象就轧。為了遵循一個集群中,只要存在leader田度,那么所有機(jī)器都必須與該leader的數(shù)據(jù)進(jìn)行同步妒御,所以leader會向learner觸發(fā)truc方法,要求這個leaner對日志進(jìn)行截斷镇饺。