19} 20} 21} 22}
1、內(nèi)存里面的元數(shù)據(jù)刷盤
19}
接上期
——1.1 .1.1》 FSEditLog.logSync()
——1.1 .1.1.1》 EditLogOutputStream.flush()
▼
//第一次:FileJouranlManager -> EditLogFileOutputStream
//第二次:QuorumJounalManager -> QuorumOutputStream
flushAndSync(durable);
↓ 先看這個
——1.1 .1.1.1》EditLogFileOutputStream# flushAndSync(durable)
//TODO 涮寫磁盤
doubleBuf.flushTo(fp);
↓ 再看這個
——1.1 .1.1.2》QuorumOutputStream#flushAndSync
——1.1 .1.1.2.1》AsyncLoggerSet#sendEdits()
//往journalnode去發(fā)送日志。
logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
2、內(nèi)存里面的元數(shù)據(jù)刷盤和journalnode 19}裆熙,20} 10分鐘
3、standby的 namenode 同步 元數(shù)據(jù) by journalnode
21} 40分鐘 22} 18 分鐘
/* EditLogTailer是一個后臺線程,啟動了以后會周期性的去journalnode集群上面去
-
讀取元數(shù)據(jù)日志萤厅,然后再把這些元數(shù)據(jù)日志應(yīng)用到自己的元數(shù)據(jù)里面(內(nèi)存+磁盤)
*/
EditLogTailer類
EditLogTailer.EditLogTailerThread.run()方法
——1》EditLogTailer.EditLogTailerThread.doWork()
//TODO 重要的代碼
doTailEdits();
//TODO 每隔60秒 StandByNameNode 去Journalnode獲取一下日志
Thread.sleep(sleepTimeMs);——1》EditLogTailer.doTailEdits()
▼
//TODO 加載當(dāng)前自己的元數(shù)據(jù)日志
FSImage image = namesystem.getFSImage();
//TODO StandByNamenoe 獲取當(dāng)前的元數(shù)據(jù)日志的最后一條日志的事務(wù)ID是多少
long lastTxnId = image.getLastAppliedTxId();
//這個地方是重要的代碼
//需要去journlanode上面去讀取元數(shù)據(jù)
//現(xiàn)在的事務(wù)id 1000,所以我去journlanode上面去讀取
//日志的時候靴迫,只需要去讀取 1001后面的日志就可以惕味。
//TODO 設(shè)置獲取Journalnode獲取日志的流
streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
//TODO 去Journalnode加載日志
editsLoaded = image.loadEdits(streams, namesystem);
——1.1》FSImage.loadEdits()
▼
//TODO 加載日志
//1000
//1001
//2000
loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
//TODO 記錄最后的一個事務(wù)ID
//1000 -> 2000
lastAppliedTxId = loader.getLastAppliedTxId();
——1.1.1》FSEditLogLoader.loadFSEdits()
//TODO 重要代碼
long numEdits = loadEditRecords(edits, false,
——1.1.1》FSEditLogLoader.loadEditRecords()
▼
//TODO 把獲取到的元數(shù)據(jù)作用到自己的內(nèi)存元數(shù)據(jù)里
long inodeId = applyEditLogOp(op, fsDir, startOpt,
in.getVersion(true), lastInodeId);
——1.1.1.1》FSEditLogLoader.applyEditLogOp()
▼
//TODO 創(chuàng)建目錄的日志
case OP_MKDIR: {
//根據(jù)匹配規(guī)則我們這次的日志
//應(yīng)該是一個創(chuàng)建目錄的日志。
MkdirOp mkdirOp = (MkdirOp)op;
inodeId = getAndUpdateLastInodeId(mkdirOp.inodeId, logVersion,
lastInodeId);
//TODO 把數(shù)據(jù)作用于自己的元數(shù)據(jù)里面玉锌。
FSDirMkdirOp.mkdirForEditLog(fsDir, inodeId,
renameReservedPathsOnUpgrade(mkdirOp.path, logVersion),
mkdirOp.permissions, mkdirOp.aclEntries, mkdirOp.timestamp);
break;
——1.1.1.1》FSDirMkdirOp.mkdirForEditLog()
//TODO 重要代碼
unprotectedMkdir(fsd, inodeId, existing, localName, permissions,
——1.1.1.1》 FSDirMkdirOp.unprotectedMkdir()
▼
//TODO 封裝成一個目錄
final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
//TODO 往文件目錄樹 該添加目錄的地方添加節(jié)點
INodesInPath iip = fsd.addLastINode(parent, dir, true);
▲回到 ——1.1.1》FSEditLogLoader.loadEditRecords
——1.1.1》FSEditLogLoader.loadEditRecords()
▼
try {
/**
* 讀取元數(shù)據(jù)日志(到了journalnode)
* 至于是如何讀取的名挥,我們等一下。
* 2.7.0
*/
op = in.readOp();
——1.1.1.1》EditLogInputStream.readOp()
——1.1.1.1》EditLogInputStream. nextOp()
↓
——1.1.1.1.1》EditLogFileInputStream. nextOp()
★——1.1.1.1.1》EditLogFileInputStream.nextOpImpl()
▼
//TODO 核心方法
init(true);
——1.1.1.1.1》EditLogFileInputStream.init()
/**
* TODO 這兒使用了裝飾模式
*/
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
▲回到 ——1.1.1.1.1》FSEditLogLoader.nextOpImpl
//TODO 通過reader讀取日志
op = reader.readOp(skipBrokenEdits); 21} 0:28:
注釋: reader 在 init里面初始化 fStream = log.getInputStream();//log是URLLog
回到 ——1.1.1.1.1》EditLogFileInputStream.init()
//所以找URLlog的getInpustream()的方法
fStream = log.getInputStream();
↓
——1.1.1.1.1.1》EditLogFileInputStream.URLLog.getInputStream();
▼
//創(chuàng)建了HttpURLConnetcion
//如果我們這兒發(fā)送的是HTTP的請求主守,讀取的Journalndoe那兒的日志
//說明journalndoe啟動起來的時候肯定會有一個JournalnodeHttpServer
//NameNode: NameNodeRpcServer NameNodeHttperServer
//DataNode: RpcServer Httpserver
//JournalNode: JournalnodeRpcServer JournalnodeHttpserver
//TODO 真相大白禀倔,我們創(chuàng)建了一個HttpURLConnection對象
connection = (HttpURLConnection)
connectionFactory.openConnection(url, isSpnegoEnabled);
//通過這個對象獲取到了輸入流
return connection.getInputStream(); 21} 0:30
↓ JournalNode服務(wù)器接受讀取editlog請求
——1.1.1.1.1.2》 JournalNodeHttpServer.start()
//TODO 綁定了一個servlet /getJournal
httpServer.addInternalServlet("getJournal", "/getJournal",
GetJournalEditServlet.class, true);
//TODO 啟動服務(wù)
httpServer.start();
——1.1.1.1.1.2.1》 GetJournalEditServlet.doGet()
▼
//TODO journalndoe讀取數(shù)據(jù)流
//就是我們平常普通的操作
editFileIn = new FileInputStream(editFile);
//TODO 流對烤
//editFileIn 這個輸入流讀取的是journalnode這兒的日志
//response.getOutputStream() 把數(shù)據(jù)寫到這個輸出流里面
TransferFsImage.copyFileToStream(response.getOutputStream(), editFile, editFileIn, throttler);
↓ //StandbyCheckpointer類做checkpoint
——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.run()
- 命名空間 = 元數(shù)據(jù)信息 = 目錄樹 = fsimage *
- StandbyCheckpointer 是一個運行在standBynamenode上的一個線程。
- 他會周期性的對命名空間做checkpoint的操作(說白了就是把 內(nèi)存里面目錄樹的信息持久化到磁盤上面)
- 并且會把這個份數(shù)據(jù)上傳到active namenode(用來替換 active namednoe上面的fsimage)
——1.1.1.1.1.2.1》StandbyCheckpointer.CheckpointerThread.dowork()
▼
//TODO 每隔60檢查以下是否需要做checkpoint
Thread.sleep(checkPeriod);
//TODO checkpoint條件一 數(shù)量 10000
//這兒是計算以下参淫,我們上次checkpoint 現(xiàn)在最新的數(shù)據(jù)差了多少救湖?
//或者說大概的意思就是說我們現(xiàn)在有多少條日志沒有checkpoint了。
final long uncheckpointed = countUncheckpointedTxns();
//TODO checkpoint條件二
//當(dāng)前時間 - 上一次checkpoint的時間涎才。
//說白了這個變量代表的意思就是 已經(jīng)有多久沒有做checkpoint了捎谨。
final long secsSinceLast = (now - lastCheckpointTime) / 1000;
//TODO 執(zhí)行checkpoint
doCheckpoint();
——1.1.1.1.1.2.1.1》StandbyCheckpointer.doCheckpoint()
▼
//TODO 把元數(shù)據(jù)持久化到磁盤上面
img.saveNamespace(namesystem, imageType, canceler);
//開啟了一個異步的線程
ExecutorService executor =
Executors.newSingleThreadExecutor(uploadThreadFactory);
//這個操作就要把剛剛從內(nèi)存里面的元數(shù)據(jù)持持久化到磁盤上面的 那個份數(shù)據(jù) 上傳到 active的namenode上面去。
TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler);
4憔维、standby的 namenode 發(fā)送 fsimage 到 主namenode 22} 0:13
流程圖
——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImageFromStorage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.uploadImage()
——1.1.1.1.1.2.1.1.1》TransferFsImage.writeFileToPutRequest()
▼
//通過http方式獲取的流
OutputStream output = connection.getOutputStream();
//輸入流肯定是自己這兒的涛救,不斷讀自己的數(shù)據(jù)
FileInputStream input = new FileInputStream(imageFile);
try {
//這兒沒有什么特別的,就是一個流對烤
//然后把數(shù)據(jù)網(wǎng)output 輸出流里面去寫业扒。
copyFileToStream(output, imageFile, input,
ImageServlet.getThrottler(conf), canceler);
↓ //NameNodeHttpServer類做上傳 22}
——1.1.1.1.1.2.1.1.2》NameNodeHttpServer.start()
——1.1.1.1.1.2.1.1.2》NameNodeHttpServer. setupServlets()
▼
//TODO 上傳元數(shù)據(jù)的請求
//SecondaryNameNode/StandByNamenode合并出來的FSImage需要替換Active NameNode的fsimage
//發(fā)送的就是http的請求检吆,請求就會轉(zhuǎn)發(fā)給這個servlet
httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
Image'Servlet.class, true);
——1.1.1.1.1.2.1.1.3》ImageServlet.doPut()
▼
//TODO 步驟一:
// 針對請求獲取到一個輸入流,不斷的把數(shù)據(jù)讀取過來
InputStream stream = request.getInputStream();
try {
long start = monotonicNow();
//TODO 步驟二:
MD5Hash downloadImageDigest = TransferFsImage
.handleUploadImageRequest(request, txid,
nnImage.getStorage(), stream,
parsedParams.getFileSize(), getThrottler(conf));
//TODO 步驟三:
// 會把接收過來的元數(shù)據(jù) 替換 現(xiàn)在已有的fsimage文件程储。
//對文件進(jìn)行重命名
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);