前言
zookeeper服務(wù)端對(duì)于每次接受到的事務(wù)性操作(節(jié)點(diǎn)的CRUD)都會(huì)先寫log役衡,同時(shí)zookeeper服務(wù)端還會(huì)周期性的根據(jù)事物數(shù)來(lái)持久化服務(wù)端的數(shù)據(jù)到磁盤(snapshot)
zookeeper服務(wù)端在啟動(dòng)的時(shí)候會(huì)從log和snapshot文件來(lái)恢復(fù)數(shù)據(jù)
數(shù)據(jù)恢復(fù)過(guò)程
zookeeper數(shù)據(jù)恢復(fù)分成兩個(gè)部分
- 從snapshot恢復(fù)
- 從log恢復(fù)
為什么需要從兩個(gè)部分恢復(fù)呢?
從前言的描述我們知道每一個(gè)事物性的操作都會(huì)先寫log,所以說(shuō)log包含了所有的事物操作記錄蹦渣。snapshot是zookeeper按照一定的條件產(chǎn)生的服務(wù)端數(shù)據(jù)持久化文件,那么可不可以直接從snapshot文件去恢復(fù)數(shù)據(jù)而不用管log文件呢蔚鸥?答案是不可以动雹,因?yàn)榇嬖谶@樣的可能:zookeeper已經(jīng)處理了一些事物但是這些操作還沒(méi)有達(dá)到觸發(fā)系統(tǒng)生成新snapshot文件的條件,如果在這個(gè)時(shí)候zookeeper server 宕機(jī)了,zookeeper如果只是從最新的已經(jīng)生成的snapshot文件去恢復(fù)數(shù)據(jù)的話就會(huì)導(dǎo)致從上一個(gè)snapshot生成到宕機(jī)這段時(shí)間內(nèi)的所有事物操作都會(huì)丟失扒袖。所有這個(gè)時(shí)候需要借助log去幫忙恢復(fù)那部分沒(méi)有被snapshot持久化的事物塞茅。另一個(gè)問(wèn)題是可不可以直接通過(guò)log去恢復(fù),答案是不可以季率。如果全部的數(shù)據(jù)都從log中恢復(fù)那么就必須保存全的log數(shù)據(jù)野瘦,同時(shí)從log恢復(fù)數(shù)據(jù)的效率也是很低的。
log飒泻,snap格式
上圖是我本機(jī)zookeeper生成的log和snapshot列表
我們注意log文件名稱格式為log.x鞭光,snapshot文件格式為snapshot.y。在zookeeper中每一個(gè)事物操作都會(huì)被分配一個(gè)事物id泞遗,事物id由服務(wù)端統(tǒng)一生成惰许,是一個(gè)遞增的數(shù)字,比如事物B比事物A晚發(fā)生刹孔,事物A分配的事物id是1啡省,那么事物B的事物id一定是大于1的一個(gè)數(shù),可能是2髓霞,3......具體是什么值取決于在事物A和事物B之間zookeeper是否還處理過(guò)別的事物卦睹,以及如果處理了處理了多少個(gè)。
-
snapshot方库,log文件名后綴的意義
我們看到snapshot文件格式為snapshot.x结序,log文件格式為log.y,我們知道x,y都是代表zookeeper處理事務(wù)的id纵潦,對(duì)于log來(lái)說(shuō)log里面記錄的事物id都是大于或者等于y的徐鹤,也就是說(shuō)log.y記錄的第一個(gè)事物的id是y,那么一個(gè)log文件會(huì)記錄多少個(gè)事物呢邀层?在zookeeper中每一個(gè)log文件都是固定64M返敬,在生成一個(gè)新的log文件的時(shí)候會(huì)預(yù)先在磁盤把這64M空間分配好,這樣做的目的是為了加快每次寫log的速度寥院,因?yàn)槭侵苯臃峙淞?4M磁盤劲赠,這些空間在磁盤上是連續(xù)的,這樣一方面可以省去隨機(jī)寫磁盤導(dǎo)致的尋道耗時(shí)秸谢,另一方面操作系統(tǒng)讀文件的時(shí)候可以緩存整塊的log文件凛澎,減少了緩存頁(yè)換進(jìn)換出的次數(shù)。對(duì)于snapshot.x來(lái)說(shuō)存儲(chǔ)的事物id都是小于x估蹄,也就是說(shuō)snapshot.x中存儲(chǔ)的最后一個(gè)事物的id是x
-
log文件內(nèi)容
上圖是我本機(jī)一個(gè)zookeeper log文件解析出來(lái)的結(jié)果,我們使用框出來(lái)的這條記錄來(lái)分析下log包含的主要信息
- 事物操作發(fā)送的時(shí)間
- 事物操作會(huì)話id(session)
- 事物操作在客戶端的編碼id cxid
- 事物操作在服務(wù)端的id zxid
- 具體的事物
1. 操作類型(create2)
2. 節(jié)點(diǎn)名稱 /test/hsbxxxxxxx
3. 節(jié)點(diǎn)存儲(chǔ)的信息 #xxxxxx
4. 權(quán)限控制信息 v{s{31,s{'world,'anyone}}}
- 摘要信息
1. 摘要版本 2
2. 摘要信息 10546528799
-
snapshot文件內(nèi)容
上面兩個(gè)圖是我們本機(jī)一個(gè)snapshot文件解析的結(jié)果塑煎,我們可以看到snapshot包含了兩部分信息:znode和session
- znode : 在snapshot文件生成的時(shí)間點(diǎn)zookeeper會(huì)持久化客戶端在zookeeper上創(chuàng)建的所有節(jié)點(diǎn)信息。
- session: 客戶端創(chuàng)建的session信息也會(huì)被持久化到snapshot中
znode的數(shù)據(jù)內(nèi)容
znode主要保存以下信息
- Czxid 創(chuàng)建節(jié)點(diǎn)的事物id
- ctime 創(chuàng)建時(shí)間
- mZxid 修改節(jié)點(diǎn)的事物id
- mtime 修改時(shí)間
- pZxid 孩子節(jié)點(diǎn)的最新事物id
- cversion 創(chuàng)建的版本信息
- dataVersion 數(shù)據(jù)版本信息
- aclVersion acl 版本信息
- ephemeralOwner 如果是瞬時(shí)節(jié)點(diǎn)臭蚁,對(duì)應(yīng)的是sessionid
- dataLength 節(jié)點(diǎn)存儲(chǔ)數(shù)據(jù)的長(zhǎng)度
還有一些信息沒(méi)有顯示比如節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)最铁,節(jié)點(diǎn)的acl信息
session信息
session主要保存了下面的信息
- sessionid
- 超時(shí)時(shí)間
- sessionid所擁有的瞬時(shí)節(jié)點(diǎn)數(shù)量
通過(guò)上面的知識(shí)鋪墊我們現(xiàn)在正式進(jìn)入zookeeper數(shù)據(jù)恢復(fù)的流程
從上面的介紹我們可以推論出讯赏,如果我們能找到最新的沒(méi)有損害的snapshot.x_newest,然后根據(jù)x去獲得所有比x大的log.m我們標(biāo)記為log<m> 以及最大的比x小的log.y_less_but_max冷尉,這樣我們通過(guò)snapshot.x_newest待逞,log<m>和log.y_less_but_max就可以恢復(fù)出zookeeper的數(shù)據(jù)了,對(duì)于圖log_snap_filename_format.png中數(shù)據(jù)网严,只需要使用snapshot.30和log.27就可以完成數(shù)據(jù)的恢復(fù)
ZookeeperServer.startData
zookeeper在啟動(dòng)的時(shí)候通過(guò)ZookeeperServer.startData方法進(jìn)行數(shù)據(jù)恢復(fù)
public void startdata() throws IOException, InterruptedException {
//check to see if zkDb is not null
//ZKDatabase是zookeeper數(shù)據(jù)存儲(chǔ)對(duì)象
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
if (!zkDb.isInitialized()) {
//進(jìn)行數(shù)據(jù)恢復(fù)
loadData();
}
}
loadData
public void loadData() throws IOException, InterruptedException {
if (zkDb.isInitialized()) {
setZxid(zkDb.getDataTreeLastProcessedZxid());
} else {
//通過(guò)zkDb.loadDataBase()進(jìn)行數(shù)據(jù)恢復(fù)
setZxid(zkDb.loadDataBase());
}
// Clean up dead sessions
//處理掉已經(jīng)掛掉的會(huì)話
List<Long> deadSessions = new ArrayList<>();
for (Long session : zkDb.getSessions()) {
if (zkDb.getSessionWithTimeOuts().get(session) == null) {
deadSessions.add(session);
}
}
for (long session : deadSessions) {
// TODO: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
//數(shù)據(jù)恢復(fù)完成后對(duì)數(shù)據(jù)庫(kù)生成一個(gè)全新的snapshot
// Make a clean snapshot
takeSnapshot();
}
ZKDatabase.loadDatabase()
zookeeper數(shù)據(jù)恢復(fù)其實(shí)就是恢復(fù)ZKDatabase中各個(gè)屬性的數(shù)據(jù)
public long loadDataBase() throws IOException {
long startTime = Time.currentElapsedTime();
//snaplog把數(shù)據(jù)恢復(fù)到dataTree识樱,sessionsWithTimeouts中,返回恢復(fù)數(shù)據(jù)后得到的最大的zxid
//dataTree存儲(chǔ)了zookeeper的節(jié)點(diǎn)信息震束,sessionsWithTimeouts存儲(chǔ)了會(huì)話的信息
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
long loadTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
return zxid;
}
到這里我們講解下DataTree怜庸。
DataTree
DataTree是zookeeper的數(shù)據(jù)存儲(chǔ)引擎類,下圖是DataTree關(guān)鍵屬性的截圖
從上圖我們知道nodes屬性存儲(chǔ)是zookeeper所有的節(jié)點(diǎn)信息
zookeeper在內(nèi)部使用ConcurrentHashMap作為節(jié)點(diǎn)的容器垢村,容器的key是節(jié)點(diǎn)的名稱割疾,value是節(jié)點(diǎn)對(duì)象的表示類DataNode,DataNode的屬性如下
snapLog.restore
我們繼續(xù)講解數(shù)據(jù)恢復(fù)方法鏈上的snapLog.restore方法
public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
long snapLoadingStartTime = Time.currentElapsedTime();
//snapLog是snapshot的表示類嘉栓,snapLog.deserialize用來(lái)恢復(fù)snapshot數(shù)據(jù)
long deserializeResult = snapLog.deserialize(dt, sessions);
ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
//創(chuàng)建日志文件表示類
FileTxnLog txnLog = new FileTxnLog(dataDir);
boolean trustEmptyDB;
File initFile = new File(dataDir.getParent(), "initialize");
if (Files.deleteIfExists(initFile.toPath())) {
LOG.info("Initialize file found, an empty database will not block voting participation");
trustEmptyDB = true;
} else {
trustEmptyDB = autoCreateDB;
}
//log文件恢復(fù)邏輯在RestoreFinalizer.run中實(shí)現(xiàn)
RestoreFinalizer finalizer = () -> {
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
// The snapshotZxidDigest will reset after replaying the txn of the
// zxid in the snapshotZxidDigest, if it's not reset to null after
// restoring, it means either there are not enough txns to cover that
// zxid or that txn is missing
DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
if (snapshotZxidDigest != null) {
LOG.warn(
"Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
+ "which might lead to inconsistent state",
Long.toHexString(highestZxid),
Long.toHexString(snapshotZxidDigest.getZxid()));
}
return highestZxid;
};
if (-1L == deserializeResult) {
/* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325) */
if (txnLog.getLastLoggedZxid() != -1) {
// ZOOKEEPER-3056: provides an escape hatch for users upgrading
// from old versions of zookeeper (3.4.x, pre 3.5.3).
if (!trustEmptySnapshot) {
throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
} else {
LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
return finalizer.run();
}
}
if (trustEmptyDB) {
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);
/* return a zxid of 0, since we know the database is empty */
return 0L;
} else {
/* return a zxid of -1, since we are possibly missing data */
LOG.warn("Unexpected empty data tree, setting zxid to -1");
dt.lastProcessedZxid = -1L;
return -1L;
}
}
return finalizer.run();
}
SnapShot.deserialize
我看下從snapshot恢復(fù)數(shù)據(jù)的源代碼
public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
//findNValidSnapshots方法從存放snapshot文件的文件夾中按照snapshot后綴事物id從大到小排序獲取最多前100個(gè)宏榕,
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
long snapZxid = -1;
boolean foundValid = false;
//下面會(huì)嘗試從snapList保存的snapshot.x文件去恢復(fù)數(shù)據(jù),只要有一個(gè)snapshot.x能恢復(fù)成功侵佃,剩下的snapshot.x就不會(huì)再被解析
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
//獲取當(dāng)前解析的snapshot文件
snap = snapList.get(i);
LOG.info("Reading snapshot {}", snap);
//獲取當(dāng)前snapshot文件的后綴麻昼,也就是這個(gè)snapshot文件保存的最大事物id
snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
//使用當(dāng)前snapshot文件創(chuàng)建數(shù)據(jù)讀取流
InputArchive ia = BinaryInputArchive.getArchive(snapIS);
//解析snapshot文件保存的節(jié)點(diǎn)信息和session信息
deserialize(dt, sessions, ia);
//檢查snapshot中的數(shù)據(jù)是否被損壞
SnapStream.checkSealIntegrity(snapIS, ia);
// Digest feature was added after the CRC to make it backward
// compatible, the older code can still read snapshots which
// includes digest.
//
// To check the intact, after adding digest we added another
// CRC check.
//解析摘要信息
if (dt.deserializeZxidDigest(ia, snapZxid)) {
//檢查snapshot中的數(shù)據(jù)是否被損壞
SnapStream.checkSealIntegrity(snapIS, ia);
}
//解析成功,停止繼續(xù)解析剩下的snapshot文件
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file {}", snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
//記錄DataTree中目前最新的事物id為snapZxid
dt.lastProcessedZxid = snapZxid;
lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);
// compare the digest if this is not a fuzzy snapshot, we want to compare
// and find inconsistent asap.
if (dt.getDigestFromLoadedSnapshot() != null) {
dt.compareSnapshotDigests(dt.lastProcessedZxid);
}
return dt.lastProcessedZxid;
}
deserialize
snapshot文件解析deserialize方法
public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
FileHeader header = new FileHeader();
//先反序列化文件頭FileHeader
header.deserialize(ia, "fileheader");
if (header.getMagic() != SNAP_MAGIC) {
throw new IOException("mismatching magic headers " + header.getMagic() + " != " + FileSnap.SNAP_MAGIC);
}
//
SerializeUtils.deserializeSnapshot(dt, ia, sessions);
}
SerializeUtils.deserializeSnapshot
public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
//先恢復(fù)session信息
int count = ia.readInt("count");
while (count > 0) {
//session信息包含兩個(gè)部分id馋辈,timeout抚芦,下面分別從文件中反序列出來(lái)
long id = ia.readLong("id");
int to = ia.readInt("timeout");
sessions.put(id, to);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"loadData --- session in archive: " + id + " with timeout: " + to);
}
count--;
}
//dataTree對(duì)象的反序列化
dt.deserialize(ia, "tree");
}
DataTree.deserialize()
public void deserialize(InputArchive ia, String tag) throws IOException {
//先解析acl的信息
aclCache.deserialize(ia);
nodes.clear();
pTrie.clear();
nodeDataSize.set(0);
//讀取節(jié)點(diǎn)信息
String path = ia.readString("path");
while (!"/".equals(path)) {
DataNode node = new DataNode();
//反序列化DataNode對(duì)象
ia.readRecord(node, "node");
nodes.put(path, node);
synchronized (node) {
aclCache.addUsage(node.acl);
}
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1) {
root = node;
} else {
String parentPath = path.substring(0, lastSlash);
DataNode parent = nodes.get(parentPath);
if (parent == null) {
throw new IOException("Invalid Datatree, unable to find "
+ "parent "
+ parentPath
+ " of path "
+ path);
}
//那自己加入到父節(jié)點(diǎn)信息中
parent.addChild(path.substring(lastSlash + 1));
long eowner = node.stat.getEphemeralOwner();
EphemeralType ephemeralType = EphemeralType.get(eowner);
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (eowner != 0) {
HashSet<String> list = ephemerals.get(eowner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(eowner, list);
}
list.add(path);
}
}
path = ia.readString("path");
}
// have counted digest for root node with "", ignore here to avoid
// counting twice for root node
nodes.putWithoutDigest("/", root);
nodeDataSize.set(approximateDataSize());
// we are done with deserializing the
// the datatree
// update the quotas - create path trie
// and also update the stat nodes
//跟新各個(gè)節(jié)點(diǎn)的用戶設(shè)置的配額信息
setupQuota();
//把沒(méi)有使用到的acl信息清楚
aclCache.purgeUnused();
}
上面是snapshot恢復(fù),我接下來(lái)看下log文件的恢復(fù)
RestoreFinalizer.run
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
// The snapshotZxidDigest will reset after replaying the txn of the
// zxid in the snapshotZxidDigest, if it's not reset to null after
// restoring, it means either there are not enough txns to cover that
// zxid or that txn is missing
DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
if (snapshotZxidDigest != null) {
LOG.warn(
"Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
+ "which might lead to inconsistent state",
Long.toHexString(highestZxid),
Long.toHexString(snapshotZxidDigest.getZxid()));
}
return highestZxid;
fastForwardFromEdits
public long fastForwardFromEdits(
DataTree dt,
Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
// txnLog.read(dt.lastProcessedZxid + 1) 生成的就是所有l(wèi)og后綴大于lastProcessedZxid和最大的小于lastProcessedZxid + 1的log的iterator對(duì)象
TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
int txnLoaded = 0;
long startTime = Time.currentElapsedTime();
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
//如果解析到的事物日志id小于當(dāng)前highestZxid的值那么打印錯(cuò)誤
LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType());
} else {
//更新highestZxid為當(dāng)前事物日志的id
highestZxid = hdr.getZxid();
}
try {
//解析這條事物日志結(jié)果寫入到DataTree中迈螟,這個(gè)地方在這里我們就不展開去講解了叉抡,在zookeeper創(chuàng)建節(jié)點(diǎn)的分析中我們會(huì)解析
processTransaction(hdr, dt, sessions, itr.getTxn());
dt.compareDigest(hdr, itr.getTxn(), itr.getDigest());
txnLoaded++;
} catch (KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: "
+ hdr.getType()
+ " error: "
+ e.getMessage(),
e);
}
listener.onTxnLoaded(hdr, itr.getTxn(), itr.getDigest());
if (!itr.next()) {
break;
}
}
} finally {
if (itr != null) {
itr.close();
}
}
long loadTime = Time.currentElapsedTime() - startTime;
LOG.info("{} txns loaded in {} ms", txnLoaded, loadTime);
ServerMetrics.getMetrics().STARTUP_TXNS_LOADED.add(txnLoaded);
ServerMetrics.getMetrics().STARTUP_TXNS_LOAD_TIME.add(loadTime);
//返回zookeeper已經(jīng)處理的最大事物id
return highestZxid;
}
TxnIterator.next()
next()是解析log文件下一條記錄的方法
public boolean next() throws IOException {
if (ia == null) {
return false;
}
try {
//解析日志條目的crc碼
long crcValue = ia.readLong("crcvalue");
//讀取一行日志
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
if (bytes == null || bytes.length == 0) {
throw new EOFException("Failed to read " + logFile);
}
// EOF or corrupted record
// validate CRC
Checksum crc = makeChecksumAlgorithm();
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) {
throw new IOException(CRC_ERROR);
}
//把日志二進(jìn)制數(shù)據(jù)轉(zhuǎn)化成TxnLogEntry對(duì)象
TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
hdr = logEntry.getHeader();
//record是日志操作體
record = logEntry.getTxn();
//日志的摘要
digest = logEntry.getDigest();
} catch (EOFException e) {
LOG.debug("EOF exception", e);
inputStream.close();
inputStream = null;
ia = null;
hdr = null;
// this means that the file has ended
// we should go to the next file
//本log已經(jīng)解析完成,創(chuàng)建下一個(gè)文件的讀取流
if (!goToNextLog()) {
return false;
}
// if we went to the next log file, we should call next() again
//解析下一個(gè)文件的第一條記錄
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
return true;
}
結(jié)語(yǔ)
上面就是zookeeper數(shù)據(jù)恢復(fù)的流程和源碼