在Zookeeper中虹脯,數(shù)據(jù)存儲分為兩部分:內(nèi)存數(shù)據(jù)存儲和磁盤數(shù)據(jù)存儲玫锋。本文主要分析服務(wù)器啟動時內(nèi)存數(shù)據(jù)庫的初始化過程
和主從服務(wù)器數(shù)據(jù)同步的過程
含长。在此之前介紹一些數(shù)據(jù)存儲涉及的基本類葛虐。
DataTree
Zookeeper的數(shù)據(jù)模型是一棵樹,DataTree是內(nèi)存數(shù)據(jù)存儲的核心鲫寄,代表了內(nèi)存中一份完整的數(shù)據(jù)(最新),包括所有的節(jié)點路徑疯淫,節(jié)點數(shù)據(jù)和ACL信息地来,對應(yīng)watches等。類的主要屬性為:
//節(jié)點路徑為key,節(jié)點數(shù)據(jù)內(nèi)容DataNode為value.實時存儲了所有的zk節(jié)點熙掺,使用ConcurrentHashMap保證并發(fā)性
private final ConcurrentHashMap<String, DataNode> nodes =new ConcurrentHashMap<String, DataNode>();
//節(jié)點數(shù)據(jù)對應(yīng)的watch
private final WatchManager dataWatches = new WatchManager();
//節(jié)點路徑對應(yīng)的watch
private final WatchManager childWatches = new WatchManager();
//key為sessionId,value為該會話對應(yīng)的臨時節(jié)點路徑未斑,方便實時訪問和清理
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
//This set contains the paths of all container nodes
private final Set<String> containers =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//This set contains the paths of all ttl nodes
private final Set<String> ttls =Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
//內(nèi)存數(shù)據(jù)庫的最大zxid
public volatile long lastProcessedZxid = 0;
DataNode
數(shù)據(jù)存儲的最小單元,包含節(jié)點的數(shù)據(jù)內(nèi)容币绩,節(jié)點狀態(tài)蜡秽,子節(jié)點列表,以及對子節(jié)點的操作接口等缆镣,主要屬性為:
//節(jié)點內(nèi)容
byte data[];
Long acl;
//節(jié)點狀態(tài)芽突,包括一些節(jié)點的元數(shù)據(jù),如ephemeralOwner董瞻,czxid等
public StatPersisted stat;
//子節(jié)點相對父節(jié)點路徑集合寞蚌,不包括父節(jié)點路徑
private Set<String> children = null;
ZKDatabase
Zookeeper的內(nèi)存數(shù)據(jù)庫,負(fù)責(zé)管理Zookeeper的所有會話,DataTree存儲和事務(wù)日志睬澡。它會定時向磁盤dump快照數(shù)據(jù)(snapCount主要控制)固额,服務(wù)器啟動時,會通過磁盤上的事務(wù)日志和快照數(shù)據(jù)文件恢復(fù)成完整的內(nèi)存數(shù)據(jù)庫煞聪。主要屬性為:
protected DataTree dataTree;
//key為sessionId,value為會話過期時間
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
//用于和磁盤交互事務(wù)日志文件和快照文件的類
protected FileTxnSnapLog snapLog;
//主從數(shù)據(jù)同步時使用
protected long minCommittedLog, maxCommittedLog;
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
//todo
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
文件存儲主要包括事務(wù)日志文件的存儲和快照文件的存儲斗躏,分別與FileTxnLog和FileSnap類有關(guān)。
FileTxnLog
實現(xiàn)了TxnLog接口昔脯,提供了API可以獲取日志和寫入日志啄糙,首先先看一下事務(wù)日志文件的格式
LogFile:
//一個日志文件由以下三個部分組成
* FileHeader TxnList ZeroPad
//1.文件頭
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
//事務(wù)內(nèi)容
* TxnList:
* Txn || Txn TxnList
* Txn:
//一條事務(wù)日志的組成部分
* checksum Txnlen TxnHeader Record 0x42
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
* Txnlen:
* len 4bytes
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
* Record:
* See Jute definition file for details on the various record types
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
主要分析下寫入日志
和日志截斷
的過程
寫入日志
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr == null) {
return false;
}
//lastZxidSeen:最大(新)的zxid
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
//如果沒有事務(wù)日志可寫,需要關(guān)聯(lián)一個新的文件流云稚,寫入日志文件頭信息FileHeader隧饼,并馬上強制刷盤
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos);
}
//確定事務(wù)日志文件是否需要擴容(預(yù)分配)
filePadding.padFile(fos.getChannel());
//事務(wù)序列化
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
//生成Checksum
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
//寫入事務(wù)日志文件流
Util.writeTxnBytes(oa, buf);
return true;
}
主要流程為:
1.確定是否有事務(wù)日志可寫
當(dāng)zookeeper服務(wù)器啟動完成時需要進行第一次事務(wù)日志的寫入,或是上一個事務(wù)日志寫滿時静陈,都會處于與事務(wù)日志文件斷開的狀態(tài)燕雁。當(dāng)logStream==null
時需要關(guān)聯(lián)一個新的文件流,寫入日志文件頭信息FileHeader鲸拥,并馬上強制刷盤拐格。
2.確定事務(wù)日志文件是否需要擴容(預(yù)分配)
long padFile(FileChannel fileChannel) throws IOException {
//currentSize:當(dāng)前文件的大小位置
//preAllocSize:默認(rèn)64MB
long newFileSize = calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
if (currentSize != newFileSize) {
fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
currentSize = newFileSize;
}
return currentSize;
}
//判斷是否需要擴容
public static long calculateFileSizeWithPadding(long position, long fileSize, long preAllocSize) {
// If preAllocSize is positive and we are within 4KB of the known end of the file calculate a new file size
if (preAllocSize > 0 && position + 4096 >= fileSize) {
// If we have written more than we have previously preallocated we need to make sure the new
// file size is larger than what we already have
if (position > fileSize) {
fileSize = position + preAllocSize;
fileSize -= fileSize % preAllocSize;
} else {
fileSize += preAllocSize;
}
}
return fileSize;
}
從calculateFileSizeWithPadding
中可以看出,當(dāng)寫入數(shù)據(jù)量超過4KB的時候便會將文件大小currentSize
擴容到preAllocSize
,默認(rèn)為64MB,并將未寫入部分填充0刑赶,好處是避免開辟新的磁盤塊捏浊,減少磁盤Seek
3.事務(wù)序列化
分別對事物頭(TxnHeader)和事務(wù)體(Record)序列化,參考zookeeper源碼分析(5)-序列化和協(xié)議
4.生成Checksum
可校驗事務(wù)日志文件的完整性和數(shù)據(jù)準(zhǔn)確性
5.寫入事務(wù)日志文件流
將事物頭撞叨,事務(wù)體和Checksum寫入文件流中金踪,由于使用的輸出流是BufferedOutputStream,會先放到緩沖區(qū)中牵敷,不會真正寫入
日志截斷
在主從同步時绝骚,如果learner服務(wù)器的事務(wù)ID大于leader服務(wù)器的事務(wù)ID,將會要求learner服務(wù)器丟棄掉比leader服務(wù)器的事務(wù)ID大的事務(wù)日志毕籽。
FileTxnIterator
是可以指定zxid的事務(wù)日志迭代器,也就是說如果需要從zxid=11的位置開始創(chuàng)建一個迭代器,那么該臺服務(wù)器上面在zxid=11之后的日志都會保存在該迭代器中并闲。其主要屬性為:
public static class FileTxnIterator implements TxnLog.TxnIterator {
//事務(wù)日志的目錄
File logDir;
//需要從該事務(wù)ID處獲得迭代器
long zxid;
//zxid所在事務(wù)文件的文件頭
TxnHeader hdr;
//當(dāng)前正在迭代的事務(wù)日志
Record record;
//zxid所在的事務(wù)日志文件
File logFile;
//輸入流
InputArchive ia;
static final String CRC_ERROR="CRC check failed";
//輸入流碗暗,可讀取到zxid的位置
PositionInputStream inputStream=null;
//比zxid所在事務(wù)日志文件大的事務(wù)文件集合
private ArrayList<File> storedFiles;
··········省略代碼·······
}
public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
if(input == null) {
throw new IOException("No log files found to truncate! This could " +
"happen if you still have snapshots from an old setup or " +
"log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
}
long pos = input.getPosition();
// now, truncate at the current position
RandomAccessFile raf=new RandomAccessFile(itr.logFile,"rw");
raf.setLength(pos);
raf.close();
while(itr.goToNextLog()) {
if (!itr.logFile.delete()) {
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
close(itr);
}
return true;
}
從代碼中可以看出萍摊,截斷的邏輯就是刪掉zxid所在事務(wù)文件中比zxid大的事務(wù)日志倡勇,以及所有比該事務(wù)文件大的事務(wù)文件。
FileSnap
數(shù)據(jù)快照是用來記錄zookeeper服務(wù)器在某一時刻的全量內(nèi)存數(shù)據(jù)村生,并將其寫入到指定位置磁盤上惊暴。存儲內(nèi)容包括DataTree信息和會話信息。FileSnap提供了快照相應(yīng)的接口趁桃,辽话,主要包括存儲肄鸽、序列化、反序列化油啤、訪問相應(yīng)快照文件典徘。
FileTxnSnapLog
封裝了TxnLog和SnapShot,提供了從磁盤中恢復(fù)內(nèi)存數(shù)據(jù)庫的restore
方法和保存快照的save
方法,主要屬性
//the directory containing the
//the transaction logs
private final File dataDir;
//the directory containing the snapshot directory
private final File snapDir;
private TxnLog txnLog;
private SnapShot snapLog;
// 版本號
public final static int VERSION = 2;
// 版本
public final static String version = "version-";
首先看下保存快照的save
方法
//syncSnap: sync the snapshot immediately after write
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,
boolean syncSnap)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);
}
主要流程就是根據(jù)當(dāng)前dataTree的最新事務(wù)id生成快照文件名益咬,然后將dataTree的內(nèi)容和sessionsWithTimeouts(會話信息)序列化逮诲,存到指定磁盤位置。
服務(wù)器啟動期間的數(shù)據(jù)初始化
就是磁盤中最新快照文件(全量數(shù)據(jù))和它之后的事務(wù)日志數(shù)據(jù)(增量數(shù)據(jù))的反序列化到內(nèi)存數(shù)據(jù)庫中的過程幽告,流程圖為:
回到zookeeper源碼分析(1)-服務(wù)端啟動流程梅鹦,在服務(wù)器啟動時,需要先
初始化FileTxnSnapLog
和初始化 ZKDatabase
1.初始化FileTxnSnapLog
public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);
this.dataDir = new File(dataDir, version + VERSION);
this.snapDir = new File(snapDir, version + VERSION);
// by default create snap/log dirs, but otherwise complain instead
// See ZOOKEEPER-1161 for more details
boolean enableAutocreate = Boolean.valueOf(
System.getProperty(ZOOKEEPER_DATADIR_AUTOCREATE,
ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT));
if (!this.dataDir.exists()) {
if (!enableAutocreate) {
throw new DatadirException("Missing data directory "
+ this.dataDir
+ ", automatic data directory creation is disabled ("
+ ZOOKEEPER_DATADIR_AUTOCREATE
+ " is false). Please create this directory manually.");
}
if (!this.dataDir.mkdirs()) {
throw new DatadirException("Unable to create data directory "
+ this.dataDir);
}
}
if (!this.dataDir.canWrite()) {
throw new DatadirException("Cannot write to data directory " + this.dataDir);
}
if (!this.snapDir.exists()) {
// by default create this directory, but otherwise complain instead
// See ZOOKEEPER-1161 for more details
if (!enableAutocreate) {
throw new DatadirException("Missing snap directory "
+ this.snapDir
+ ", automatic data directory creation is disabled ("
+ ZOOKEEPER_DATADIR_AUTOCREATE
+ " is false). Please create this directory manually.");
}
if (!this.snapDir.mkdirs()) {
throw new DatadirException("Unable to create snap directory "
+ this.snapDir);
}
}
if (!this.snapDir.canWrite()) {
throw new DatadirException("Cannot write to snap directory " + this.snapDir);
}
// check content of transaction log and snapshot dirs if they are two different directories
// See ZOOKEEPER-2967 for more details
if(!this.dataDir.getPath().equals(this.snapDir.getPath())){
//用來檢查當(dāng)dataDir和snapDir不同時冗锁,dataDir是否包含了快照文件齐唆,snapDir是否包含了事務(wù)日志文件
checkLogDir();
checkSnapDir();
}
txnLog = new FileTxnLog(this.dataDir);
snapLog = new FileSnap(this.snapDir);
autoCreateDB = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DB_AUTOCREATE,
ZOOKEEPER_DB_AUTOCREATE_DEFAULT));
}
可以看到會在傳入的datadir和snapdir目錄下新生成version-2的目錄,并且會判斷目錄是否創(chuàng)建成功冻河,之后會創(chuàng)建txnLog和snapLog箍邮。
2.初始化 ZKDatabase
public ZKDatabase(FileTxnSnapLog snapLog) {
dataTree = new DataTree();
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
this.snapLog = snapLog;
·······
}
可以看到主要初始化了DataTree和sessionsWithTimeouts,前者會在zookeeper創(chuàng)建一些配置跟節(jié)點芋绸,如/,/zookeeper,/zookeeper/quota等節(jié)點媒殉,與zookeeper自身服務(wù)器相關(guān)的節(jié)點。
之后調(diào)用數(shù)據(jù)初始化的方法為ZooKeeperServer.loadData
public void loadData() throws IOException, InterruptedException {
//如果是leader服務(wù)器摔敛,會在lead方法中再次調(diào)用該方法,此時zkDb.isInitialized()=true,僅做快照存儲的工作
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
//第一次初始化
setZxid(zkDb.loadDataBase());
}
·········會話過期清理的代碼···········
// Make a clean snapshot
takeSnapshot();
}
public void takeSnapshot() {
takeSnapshot(false);
}
public void takeSnapshot(boolean syncSnap){
txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);
·········省略異常檢查···········
}
第一次初始化的時候會調(diào)用zkDb.loadDataBase()
,該方法最終會返回內(nèi)存數(shù)據(jù)庫最新的事務(wù)id
public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
也就是調(diào)用FileTxnSnapLog.restore
全封,首先介紹下FileTxnSnapLog的內(nèi)部類PlayBackListener
它是用來接收事務(wù)應(yīng)用過程中的回調(diào)马昙,在Zookeeper數(shù)據(jù)恢復(fù)后期,會有事務(wù)修正過程(增量數(shù)據(jù)的反序列化過程)刹悴,此過程會回調(diào)PlayBackListener.onTxnLoaded來進行對應(yīng)的數(shù)據(jù)修正行楞。這里傳入的是commitProposalPlaybackListener
FileTxnSnapLog.restore
//方法參數(shù)中DataTree dt, Map<Long, Integer> sessions是要恢復(fù)內(nèi)存數(shù)據(jù)庫的對象,其實就是ZKDatabase中的屬性
//PlayBackListener是用來修正事務(wù)日志時回調(diào)用的
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
//解析快照數(shù)據(jù)
long deserializeResult = snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
boolean trustEmptyDB;
File initFile = new File(dataDir.getParent(), "initialize");
if (Files.deleteIfExists(initFile.toPath())) {
LOG.info("Initialize file found, an empty database will not block voting participation");
trustEmptyDB = true;
} else {
//
trustEmptyDB = autoCreateDB;
}
if (-1L == deserializeResult) {
/* this means that we couldn't find any snapshot, so we need to
* initialize an empty database (reported in ZOOKEEPER-2325) */
if (txnLog.getLastLoggedZxid() != -1) {
throw new IOException(
"No snapshot found, but there are log entries. " +
"Something is broken!");
}
//默認(rèn)相信空磁盤數(shù)據(jù)土匀,因為服務(wù)器第一次啟動的時候數(shù)據(jù)一般為空
if (trustEmptyDB) {
/* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
* or use Map on save() */
save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false);
/* return a zxid of 0, since we know the database is empty */
return 0L;
} else {
/* return a zxid of -1, since we are possibly missing data */
LOG.warn("Unexpected empty data tree, setting zxid to -1");
dt.lastProcessedZxid = -1L;
return -1L;
}
}
return fastForwardFromEdits(dt, sessions, listener);
}
3.解析快照數(shù)據(jù)
解析快照數(shù)據(jù)到datatree和sessions中子房,取出最新的100個快照數(shù)據(jù),依次解析判斷快照文件是否有數(shù)據(jù)且是可用的snapLog.deserialize(dt, sessions)
,返回快照文件數(shù)據(jù)的最大ZXID
public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
snap = snapList.get(i);
LOG.info("Reading snapshot " + snap);
try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt, sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch (IOException e) {
LOG.warn("problem reading snap file " + snap, e);
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}
若返回-1就轧,說明不存在快照文件:
如果事務(wù)日志文件zxid也為-1证杭,說明磁盤數(shù)據(jù)為空,則將空數(shù)據(jù)快照一下妒御,返回最大事務(wù)id,為0解愤。否則,調(diào)用fastForwardFromEdits
4.獲取最新的ZXID
public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
try {
while (true) {
// iterator points to
// the first valid txn when initialized
hdr = itr.getHeader();
if (hdr == null) {
//empty logs
return dt.lastProcessedZxid;
}
if (hdr.getZxid() < highestZxid && highestZxid != 0) {
LOG.error("{}(highestZxid) > {}(next log) for type {}",
highestZxid, hdr.getZxid(), hdr.getType());
} else {
highestZxid = hdr.getZxid();
}
try {
processTransaction(hdr,dt,sessions, itr.getTxn());
} catch(KeeperException.NoNodeException e) {
throw new IOException("Failed to process transaction type: " +
hdr.getType() + " error: " + e.getMessage(), e);
}
listener.onTxnLoaded(hdr, itr.getTxn());
if (!itr.next())
break;
}
} finally {
if (itr != null) {
itr.close();
}
}
return highestZxid;
}
首先基于當(dāng)前dt.lastProcessedZxid
+1獲取一個事務(wù)日志迭代器乎莉,這些事務(wù)日志是需要更新的增量數(shù)據(jù)送讲。while循環(huán)一條條迭代這些事務(wù)日志奸笤,不斷的更新highestZxid
,最終將其返回哼鬓。
5.應(yīng)用事務(wù)
在循環(huán)過程中處理事務(wù)日志processTransaction
,也就是根據(jù)事務(wù)日志類型不斷的更新sessions 和DataTree中的數(shù)據(jù)內(nèi)容
6.回調(diào)事務(wù)
回調(diào)listener.onTxnLoaded
监右,就是ZKDatabase中的commitProposalPlaybackListener
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
public void onTxnLoaded(TxnHeader hdr, Record txn){
addCommittedProposal(hdr, txn);
}
};
private void addCommittedProposal(TxnHeader hdr, Record txn) {
Request r = new Request(0, hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
addCommittedProposal(r);
}
/**
* maintains a list of last <i>committedLog</i>
* or so committed requests. This is used for
* fast follower synchronization.
* @param request committed request
*/
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
if (committedLog.isEmpty()) {
//
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
byte[] data = SerializeUtils.serializeRequest(request);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
} finally {
wl.unlock();
}
}
主要邏輯在addCommittedProposal
方法中,構(gòu)造了一個LinkedList<Proposal> committedLog
,用來存儲過來的每一條增量事務(wù)日志异希,minCommittedLog
保存的是第一條增量事務(wù)日志的zxid, maxCommittedLog
保存的是最后以條增量事務(wù)日志的zxid健盒。這三個變量是用來主從做快速同步判斷
用的。
7.epoch校驗
epoch標(biāo)識了當(dāng)前l(fā)eader的周期宠互,每次選舉產(chǎn)生一個新的Leader服務(wù)器之后味榛,就會生成一個新的epoch。集群間相互通信的過程中予跌,都會帶上這個epoch以確保彼此在同一個Leader周期內(nèi)搏色。
對于leader服務(wù)器,完成數(shù)據(jù)初始化時會將自己的currentEpoch
和剛解析出來的最大zxid
放到leaderStateSummary
中券册,和主動連接的learner服務(wù)器的epoch
和最大zxid
對比频轿,必須保證leader服務(wù)器的leaderStateSummary
大于learner服務(wù)器的StateSummary
才能說明leader服務(wù)器的數(shù)據(jù)是比learner服務(wù)器新的,然后leader服務(wù)器才可以開啟新一輪的epoch烁焙,進行數(shù)據(jù)同步的工作航邢。
主從服務(wù)器間的數(shù)據(jù)同步
大致過程如圖:
由zookeeper源碼分析(4)-選舉流程和服務(wù)器啟動處理可知,當(dāng)
LearnerHandler
接收到Learner服務(wù)器的ACKEPOCH消息后會開始進行主從同步
Leader數(shù)據(jù)同步發(fā)送過程
LearnerHandler.run
public void run() {
····省略接收ACKEPOCH消息之前的交互過程···
//learner zxid
peerLastZxid = ss.getLastZxid();
// Take any necessary action if we need to send TRUNC or DIFF
// startForwarding() will be called in all cases
//確定是否需要進行全量同步
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
LOG.debug("Sending NEWLEADER message to " + sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
//發(fā)送NEWLEADER消息
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
.toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
//強刷骄蝇,這里對應(yīng)的DIFF/TRUNC/DIFF+TRUNC方式的同步
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
//全量同步
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
LearnerSnapshot snapshot =
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
//強刷膳殷,這里對應(yīng)的SNAP方式的同步
bufferedOutput.flush();
bufferedOutput.flush();
} finally {
snapshot.close();
}
}
// Start thread that blast packets in the queue to learner
startSendingPackets();
//等待learner服務(wù)器的同步完成的ACK通知
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK,"
+ " but received packet: {}", packetToString(qp));
return;
}
if(LOG.isDebugEnabled()){
LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());
//同步時間檢測,不能超過tickTime*syncLimit
syncLimitCheck.start();
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
LOG.debug("Sending UPTODATE message to " + sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
···············同步完成九火,開始與learner服務(wù)器的正常通信···········
}
在服務(wù)器數(shù)據(jù)初始化時候赚窃,我們提到內(nèi)存數(shù)據(jù)庫zkDatabase
會保存最新快照之后的增量數(shù)據(jù),
LinkedList<Proposal> committedLog:
用來存儲過來的每一條增量事務(wù)日志
minCommittedLog:
第一條增量事務(wù)日志的zxid
maxCommittedLog:
最后一條增量事務(wù)日志的zxid
Leader服務(wù)器會根據(jù)learner服務(wù)器的最大事務(wù)ID: peerLastZxid
和minCommittedLog
/ maxCommittedLog
之間的大小關(guān)系來最終確定是差異同步還是全量同步岔激,主要邏輯在syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader)
public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
//learner服務(wù)器zxid是否為0
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
// Keep track of the latest zxid which already queued
long currentZxid = peerLastZxid;
boolean needSnap = true;
//是否設(shè)置了快照大小參數(shù)勒极,默認(rèn)設(shè)置了,且snapshotSizeFactor=0.33
boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
ReentrantReadWriteLock lock = db.getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
long maxCommittedLog = db.getmaxCommittedLog();
long minCommittedLog = db.getminCommittedLog();
long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
if (db.getCommittedLog().isEmpty()) {
/*
* It is possible that committedLog is empty. In that case
* setting these value to the latest txn in leader db
* will reduce the case that we need to handle
*
* Here is how each case handle by the if block below
* 1. lastProcessZxid == peerZxid -> Handle by (2)
* 2. lastProcessZxid < peerZxid -> Handle by (3)
* 3. lastProcessZxid > peerZxid -> Handle by (5)
*/
minCommittedLog = lastProcessedZxid;
maxCommittedLog = lastProcessedZxid;
}
/*
* Here are the cases that we want to handle
*
* 1. Force sending snapshot (for testing purpose)
* 2. Peer and leader is already sync, send empty diff
* 3. Follower has txn that we haven't seen. This may be old leader
* so we need to send TRUNC. However, if peer has newEpochZxid,
* we cannot send TRUNC since the follower has no txnlog
* 4. Follower is within committedLog range or already in-sync.
* We may need to send DIFF or TRUNC depending on follower's zxid
* We always send empty DIFF if follower is already in-sync
* 5. Follower missed the committedLog. We will try to use on-disk
* txnlog + committedLog to sync with follower. If that fail,
* we will send snapshot
*/
if (forceSnapSync) {
// Force leader to use snapshot to sync with follower
LOG.warn("Forcing snapshot sync - should not see this in production");
} else if (lastProcessedZxid == peerLastZxid) {
// Follower is already sync with us, send empty diff
//將packet發(fā)送到queuedPackets中虑鼎,queuedPackets是負(fù)責(zé)發(fā)送消息到learner服務(wù)器的隊列
queueOpPacket(Leader.DIFF, peerLastZxid);
needOpPacket = false;
needSnap = false;
} else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
// Newer than committedLog, send trunc and done
queueOpPacket(Leader.TRUNC, maxCommittedLog);
currentZxid = maxCommittedLog;
needOpPacket = false;
needSnap = false;
} else if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
// Follower is within commitLog range
Iterator<Proposal> itr = db.getCommittedLog().iterator();
//差異化同步辱匿,發(fā)送(peerLaxtZxid, maxZxid]之間的消息給learner服務(wù)器
currentZxid = queueCommittedProposals(itr, peerLastZxid,
null, maxCommittedLog);
needSnap = false;
} else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
// Use txnlog and committedLog to sync
// Calculate sizeLimit that we allow to retrieve txnlog from disk
long sizeLimit = db.calculateTxnLogSizeLimit();
// This method can return empty iterator if the requested zxid
// is older than on-disk txnlog
Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
peerLastZxid, sizeLimit);
if (txnLogItr.hasNext()) {
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
minCommittedLog, maxCommittedLog);
currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
null, maxCommittedLog);
needSnap = false;
}
// closing the resources
if (txnLogItr instanceof TxnLogProposalIterator) {
TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
txnProposalItr.close();
}
} else {
LOG.warn("Unhandled scenario for peer sid: " + getSid());
}
LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
" for peer sid: " + getSid());
//lets the leader know that a follower is capable of following and is done syncing
//已經(jīng)通過的提議但是還沒來得及提交的Proposal
leaderLastZxid = leader.startForwarding(this, currentZxid);
} finally {
rl.unlock();
}
//needOpPacket:用來判斷是否需要發(fā)送TRUNC或DIFF消息給發(fā)送隊列,默認(rèn)為true
if (needOpPacket && !needSnap) {
// This should never happen, but we should fall back to sending
// snapshot just in case.
LOG.error("Unhandled scenario for peer sid: " + getSid() +
" fall back to use snapshot");
needSnap = true;
}
return needSnap;
}
可以看出同步方式可大致分為5種:
1.強制快照同步
可設(shè)置forceSnapSync為true炫彩,用于測試使用匾七,默認(rèn)為false
2.不需要同步
此時主從最大zxid一致,不需要同步媒楼,僅需要發(fā)送一個DIFF消息即可
3.回滾同步
learner服務(wù)器zxid peerLastZxid
大于leader服務(wù)器zxid lastProcessedZxid
乐尊,并且peerLastZxid>0,此時需要從服務(wù)器丟棄大于lastProcessedZxid的事務(wù)日志,會發(fā)送TRUNC消息給learner服務(wù)器queueOpPacket(Leader.TRUNC, maxCommittedLog);
4.差異化同步(TRUNC+DIFF同步)
peerLastZxid
位于minCommittedLog
和maxCommittedLog
之間划址,但peerLastZxid
找不到這個范圍內(nèi)的值扔嵌,則先回滾到離peerLastZxid
最近的前一條消息prevProposalZxid
限府,然后再進行(prevProposalZxid, maxZxid]之間的zxid同步peerLastZxid
位于minCommittedLog
和maxCommittedLog
之間,且peerLastZxid
真實存在痢缎,則只需要進行(peerLaxtZxid, maxZxid]之間的zxid同步胁勺,與上面一條的差別處理可見LearnerHanler.queueCommittedProposals
protected long queueCommittedProposals(Iterator<Proposal> itr,
long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
long queuedZxid = peerLastZxid;
// as we look through proposals, this variable keeps track of previous
// proposal Id.
long prevProposalZxid = -1;
while (itr.hasNext()) {
Proposal propose = itr.next();
long packetZxid = propose.packet.getZxid();
// abort if we hit the limit
if ((maxZxid != null) && (packetZxid > maxZxid)) {
break;
}
// skip the proposals the peer already has
if (packetZxid < peerLastZxid) {
prevProposalZxid = packetZxid;
continue;
}
// If we are sending the first packet, figure out whether to trunc
// or diff
if (needOpPacket) {
// Send diff when we see the follower's zxid in our history,情況5-1
if (packetZxid == peerLastZxid) {
LOG.info("Sending DIFF zxid=0x" +
Long.toHexString(lastCommittedZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
continue;
}
if (isPeerNewEpochZxid) {
// Send diff and fall through if zxid is of a new-epoch
LOG.info("Sending DIFF zxid=0x" +
Long.toHexString(lastCommittedZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
} else if (packetZxid > peerLastZxid ) {
// Peer have some proposals that the leader hasn't seen yet,情況4
// it may used to be a leader
if (ZxidUtils.getEpochFromZxid(packetZxid) !=
ZxidUtils.getEpochFromZxid(peerLastZxid)) {
// We cannot send TRUNC that cross epoch boundary.
// The learner will crash if it is asked to do so.
// We will send snapshot this those cases.
LOG.warn("Cannot send TRUNC to peer sid: " + getSid() +
" peer zxid is from different epoch" );
return queuedZxid;
}
LOG.info("Sending TRUNC zxid=0x" +
Long.toHexString(prevProposalZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.TRUNC, prevProposalZxid);
needOpPacket = false;
}
}
if (packetZxid <= queuedZxid) {
// We can get here, if we don't have op packet to queue
// or there is a duplicate txn in a given iterator
continue;
}
// Since this is already a committed proposal, we need to follow
// it by a commit packet
//發(fā)送PROPOSAL消息独旷,包含數(shù)據(jù)信息
queuePacket(propose.packet);
//發(fā)送COMMIT消息署穗,僅包含需要提交的zxid信息
queueOpPacket(Leader.COMMIT, packetZxid);
queuedZxid = packetZxid;
}
if (needOpPacket && isPeerNewEpochZxid) {
// We will send DIFF for this kind of zxid in any case. This if-block
// is the catch when our history older than learner and there is
// no new txn since then. So we need an empty diff
LOG.info("Sending DIFF zxid=0x" +
Long.toHexString(lastCommittedZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
}
return queuedZxid;
}
- 如果
peerLastZxid < minCommittedLog
,但是所處事務(wù)日志文件txnLog位置之后的事務(wù)大小小于最近快照中后snapSize * snapshotSizeFactor
的大小,則采用txnLog + committedLog的方式同步嵌洼,分為兩部分:
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
minCommittedLog, maxCommittedLog);
currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
null, maxCommittedLog);
5.全量同步
如果peerLastZxid
小于以上情況案疲,則進行全量同步,該方法返回true麻养,回到LearnerHandler.run
,會發(fā)送SNAP消息褐啡,并將整個ZKDatabase序列化,發(fā)送出去
之后會開啟線程異步發(fā)送queuedPackets
隊列消息鳖昌,等待learner服務(wù)器的同步完成ACK消息备畦。
Learner數(shù)據(jù)同步接收過程
當(dāng)Learner服務(wù)器發(fā)送完ACKEPOCH消息后,便會進入同步過程Learner.syncWithLeader
(Follewer/Observer都會調(diào)用此方法)
protected void syncWithLeader(long newLeaderZxid) throws Exception{
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
// db is clear as part of deserializeSnapshot()
zk.getZKDatabase().deserializeSnapshot(leaderIs);
// ZOOKEEPER-2819: overwrite config node content extracted
// from leader snapshot with local config, to avoid potential
// inconsistency of config node content during rolling restart.
if (!QuorumPeerConfig.isReconfigEnabled()) {
LOG.debug("Reset config node content from local config after deserialization of snapshot.");
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
}
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
throw new IOException("Missing signature");
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
// immediately persist the latest snapshot when there is txn log gap
syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log "
+ Long.toHexString(qp.getZxid()));
System.exit(13);
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
}
else {
LOG.error("Got unexpected packet from leader: {}, exiting ... ",
LearnerHandler.packetToString(qp));
System.exit(13);
}
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.createSessionTracker();
long lastQueued = 0;
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
readPacket(qp);
switch(qp.getType()) {
case Leader.PROPOSAL:
PacketInFlight pif = new PacketInFlight();
pif.hdr = new TxnHeader();
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(pif.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
if (pif.hdr.getType() == OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
boolean majorChange = self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),
qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
PacketInFlight packet = new PacketInFlight();
packet.hdr = new TxnHeader();
if (qp.getType() == Leader.INFORMANDACTIVATE) {
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)packet.rec).getData()));
boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
} else {
packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(packet.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
}
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE:
LOG.info("Learner received UPTODATE message");
if (newLeaderQV!=null) {
boolean majorChange =
self.processReconfig(newLeaderQV, null, null, true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (isPreZAB1_0) {
zk.takeSnapshot(syncSnapshot);
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
LOG.info("Learner received NEWLEADER message");
if (qp.getData()!=null && qp.getData().length > 1) {
try {
QuorumVerifier qv = self.configFromString(new String(qp.getData()));
self.setLastSeenQuorumVerifier(qv, true);
newLeaderQV = qv;
} catch (Exception e) {
e.printStackTrace();
}
}
if (snapshotNeeded) {
zk.takeSnapshot(syncSnapshot);
}
self.setCurrentEpoch(newEpoch);
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
* send leader election notifications to the ensemble.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
self.updateElectionVote(newEpoch);
// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
for(PacketInFlight p: packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec);
}
for(Long zxid: packetsCommitted) {
fzk.commit(zxid);
}
} else if (zk instanceof ObserverZooKeeperServer) {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
// old leader send outstanding proposal to observer
LOG.warn("Committing " + Long.toHexString(zxid)
+ ", but next proposal is "
+ Long.toHexString(p.hdr.getZxid()));
continue;
}
packetsCommitted.remove();
Request request = new Request(null, p.hdr.getClientId(),
p.hdr.getCxid(), p.hdr.getType(), null, null);
request.setTxn(p.rec);
request.setHdr(p.hdr);
ozk.commitRequest(request);
}
} else {
// New server type need to handle in-flight packets
throw new UnsupportedOperationException("Unknown server type");
}
}
大致流程為:首先會判斷第一個接收到的消息類型是DIFF许昨,SNAP還是TRUNC懂盐,分別進行不同的數(shù)據(jù)同步準(zhǔn)備。然后開始不斷讀取同步消息糕档,直到接收到NEWLEADER消息后莉恼,發(fā)送ACK給leader服務(wù)器,等待leader服務(wù)器的UPTODATE消息速那,表示同步完成类垫,然后再發(fā)送ACK給leader服務(wù)器,表示learner服務(wù)器也知道了琅坡,開始啟動zkServer,對外提供服務(wù)。
感謝您的閱讀残家,我是Monica23334 || Monica2333 榆俺。立下每周寫一篇原創(chuàng)文章flag的小姐姐,關(guān)注我并期待打臉吧~