作為分布式文件系統(tǒng)蜻牢,HDFS擅于處理大文件的讀/寫(xiě)萌朱。這得益于“文件元信息與文件數(shù)據(jù)分離缀磕,文件數(shù)據(jù)分塊存儲(chǔ)”的思想:namenode管理文件元信息落君,datanode管理分塊的文件數(shù)據(jù)于置。
HDFS 2.x進(jìn)一步將數(shù)據(jù)塊存儲(chǔ)服務(wù)抽象為blockpool茧吊,不過(guò)寫(xiě)數(shù)據(jù)塊過(guò)程與1.x大同小異。本文假設(shè)副本系數(shù)1(即寫(xiě)數(shù)據(jù)塊只涉及1個(gè)客戶端+1個(gè)datanode)八毯,未發(fā)生任何異常搓侄,分析datanode寫(xiě)數(shù)據(jù)塊的過(guò)程。
源碼版本:Apache Hadoop 2.6.0
可參考猴子追源碼時(shí)的速記打斷點(diǎn)话速,親自debug一遍讶踪。
副本系數(shù)1,即只需要一個(gè)datanode構(gòu)成最小的管道泊交,與更常見(jiàn)的管道寫(xiě)相比乳讥,可以認(rèn)為“無(wú)管道”。后續(xù)再寫(xiě)兩篇文章分別分析管道寫(xiě)無(wú)異常廓俭、管道寫(xiě)有異常兩種情況云石。
開(kāi)始之前
總覽
參考源碼|HDFS之DataNode:?jiǎn)?dòng)過(guò)程,我們大體了解了datanode上有哪些重要的工作線程研乒。其中汹忠,與寫(xiě)數(shù)據(jù)塊過(guò)程聯(lián)系最緊密的是DataXceiverServer與BPServiceActor。
參考HDFS-1.x、2.x的RPC接口宽菜,客戶端與數(shù)據(jù)節(jié)點(diǎn)間主要通過(guò)流接口DataTransferProtocol完成數(shù)據(jù)塊的讀/寫(xiě)谣膳。DataTransferProtocol用于整個(gè)管道中的客戶端、數(shù)據(jù)節(jié)點(diǎn)間的流式通信铅乡,其中继谚,DataTransferProtocol#writeBlock()負(fù)責(zé)完成寫(xiě)數(shù)據(jù)塊的工作:
public void writeBlock(final ExtendedBlock blk,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo source,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException;
文章的組織結(jié)構(gòu)
- 如果只涉及單個(gè)分支的分析,則放在同一節(jié)阵幸。
- 如果涉及多個(gè)分支的分析犬庇,則在下一級(jí)分多個(gè)節(jié),每節(jié)討論一個(gè)分支侨嘀。
- 多線程的分析同多分支臭挽。
- 每一個(gè)分支和線程的組織結(jié)構(gòu)遵循規(guī)則1-3。
DataXceiverServer線程
注意咬腕,DataTransferProtocol并不是一個(gè)RPC協(xié)議欢峰,因此,常見(jiàn)通過(guò)的尋找DataTransferProtocol接口的實(shí)現(xiàn)類(lèi)來(lái)確定“客戶端調(diào)用的遠(yuǎn)程方法”是站不住腳涨共。不過(guò)依然可以按照這個(gè)思路倒追纽帖,看實(shí)現(xiàn)類(lèi)究竟是如何被創(chuàng)建,與誰(shuí)通信举反,來(lái)驗(yàn)證是否找到了正確的實(shí)現(xiàn)類(lèi)懊直。
依靠debug,猴子從DataXceiver類(lèi)反向追到了DataXceiverServer類(lèi)火鼻。這里從DataXceiverServer類(lèi)開(kāi)始室囊,正向講解。
DataXceiverServer線程在DataNode#runDatanodeDaemon()方法中啟動(dòng)魁索。
DataXceiverServer#run():
public void run() {
Peer peer = null;
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
...// 檢查DataXceiver線程的數(shù)量融撞,超過(guò)最大限制就拋出IOE
// 啟動(dòng)一個(gè)新的DataXceiver線程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
// another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances
if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
...// 清理
} catch (OutOfMemoryError ie) {
...// 清理并sleep 30s
} catch (Throwable te) {
// 其他異常就關(guān)閉datanode
LOG.error(datanode.getDisplayName()
+ ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
...// 關(guān)閉peerServer并清理所有peers
}
DataXceiverServer線程是一個(gè)典型的Tcp Socket Server〈治担客戶端每來(lái)一個(gè)TCP請(qǐng)求尝偎,如果datanode上的DataXceiver線程數(shù)量還沒(méi)超過(guò)限制,就啟動(dòng)一個(gè)新的DataXceiver線程鹏控。
默認(rèn)的最大DataXceiver線程數(shù)量為4096致扯,通過(guò)
dfs.datanode.max.transfer.threads
設(shè)置。
主流程:DataXceiver線程
DataXceiver#run():
public void run() {
int opsProcessed = 0;
Op op = null;
try {
...// 一些初始化
// 使用一個(gè)循環(huán)当辐,以允許客戶端發(fā)送新的操作請(qǐng)求時(shí)重用TCP連接
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
...// 超時(shí)設(shè)置
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
...// 此處的優(yōu)化使得正常處理完一個(gè)操作后抖僵,一定會(huì)拋出EOFException或ClosedChannelException,可以退出循環(huán)
...// 如果是其他異常瀑构,則說(shuō)明出現(xiàn)錯(cuò)誤裆针,重新拋出以退出循環(huán)
}
...// 超時(shí)設(shè)置
opStartTime = now();
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
...// 異常處理
} finally {
...// 資源清理刨摩,包括打開(kāi)的文件、socket等
}
}
此處的優(yōu)化不多講世吨。
DataXceiver#readOp()繼承自Receiver類(lèi):從客戶端發(fā)來(lái)的socket中讀取op碼澡刹,判斷客戶端要進(jìn)行何種操作操作。寫(xiě)數(shù)據(jù)塊使用的op碼為80耘婚,返回的枚舉變量op = Op.WRITE_BLOCK
罢浇。
DataXceiver#processOp()也繼承自Receiver類(lèi):
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
...// 其他case
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
...
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
} finally {
if (traceScope != null) traceScope.close();
}
}
HDFS 2.x相對(duì)于1.x的另一項(xiàng)改進(jìn),在流式接口中也大幅替換為使用protobuf沐祷,不再是裸TCP分析字節(jié)流了嚷闭。
Receiver類(lèi)實(shí)現(xiàn)了DataTransferProtocol接口,但沒(méi)有實(shí)現(xiàn)DataTransferProtocol#writeBlock()赖临。多態(tài)特性告訴我們胞锰,這里會(huì)調(diào)用DataXceiver#writeBlock()。
終于回到了DataXceiver#writeBlock():
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
...// 檢查兢榨,設(shè)置參數(shù)等
...// 構(gòu)建向上游節(jié)點(diǎn)或客戶端回復(fù)的輸出流(此處即為客戶端)
...// 略
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// 創(chuàng)建BlockReceiver嗅榕,準(zhǔn)備接收數(shù)據(jù)塊
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist);
storageUuid = blockReceiver.getStorageUuid();
} else {
...// 管道錯(cuò)誤恢復(fù)相關(guān)
}
...// 下游節(jié)點(diǎn)的處理。一個(gè)datanode是沒(méi)有下游節(jié)點(diǎn)的吵聪。
// 發(fā)送的第一個(gè)packet是空的凌那,只用于建立管道。這里立即返回ack表示管道是否建立成功
// 由于該datanode沒(méi)有下游節(jié)點(diǎn)吟逝,則執(zhí)行到此處帽蝶,表示管道已經(jīng)建立成功
if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// 接收數(shù)據(jù)塊(也負(fù)責(zé)發(fā)送到下游,不過(guò)此處沒(méi)有下游節(jié)點(diǎn))
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
...// 數(shù)據(jù)塊復(fù)制相關(guān)
}
...// 數(shù)據(jù)塊恢復(fù)相關(guān)
...// 數(shù)據(jù)塊復(fù)制相關(guān)
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
...// 清理資源
}
...// 更新metrics
}
特別說(shuō)明幾個(gè)參數(shù):
- stage:表示數(shù)據(jù)塊構(gòu)建的狀態(tài)块攒。此處為
BlockConstructionStage.PIPELINE_SETUP_CREATE
励稳。 - isDatanode:表示寫(xiě)數(shù)據(jù)塊請(qǐng)求是否由數(shù)據(jù)節(jié)點(diǎn)發(fā)起。如果寫(xiě)請(qǐng)求中clientname為空局蚀,就說(shuō)明是由數(shù)據(jù)節(jié)點(diǎn)發(fā)起(如數(shù)據(jù)塊復(fù)制等由數(shù)據(jù)節(jié)點(diǎn)發(fā)起)麦锯。此處為false。
- isClient:表示寫(xiě)數(shù)據(jù)塊請(qǐng)求是否由客戶端發(fā)起琅绅,此值一定與isDatanode相反。此處為true鹅巍。
- isTransfers:表示寫(xiě)數(shù)據(jù)塊請(qǐng)求是否為數(shù)據(jù)塊復(fù)制千扶。如果stage為
BlockConstructionStage.TRANSFER_RBW
或BlockConstructionStage.TRANSFER_FINALIZED
,則表示為了數(shù)據(jù)塊復(fù)制骆捧。此處為false澎羞。
下面討論“準(zhǔn)備接收數(shù)據(jù)塊”和“接收數(shù)據(jù)塊”兩個(gè)過(guò)程。
準(zhǔn)備接收數(shù)據(jù)塊:BlockReceiver.<init>()
BlockReceiver.<init>()
:
BlockReceiver(final ExtendedBlock block, final StorageType storageType,
final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
try{
...// 檢查敛苇,設(shè)置參數(shù)等
// 打開(kāi)文件妆绞,準(zhǔn)備接收數(shù)據(jù)塊
if (isDatanode) { // 數(shù)據(jù)塊復(fù)制和數(shù)據(jù)塊移動(dòng)是由數(shù)據(jù)節(jié)點(diǎn)發(fā)起的顺呕,這是在tmp目錄下創(chuàng)建block文件
replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
// 對(duì)于客戶端發(fā)起的寫(xiě)數(shù)據(jù)請(qǐng)求(只考慮create,不考慮append)括饶,在rbw目錄下創(chuàng)建數(shù)據(jù)塊(block文件株茶、meta文件,數(shù)據(jù)塊處于RBW狀態(tài))
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
...// 其他case
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
}
...// 略
// 對(duì)于數(shù)據(jù)塊復(fù)制图焰、數(shù)據(jù)塊移動(dòng)启盛、客戶端創(chuàng)建數(shù)據(jù)塊,本質(zhì)上都在創(chuàng)建新的block文件技羔。對(duì)于這些情況僵闯,isCreate為true
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
...// 計(jì)算meta文件的文件頭
// 如果需要?jiǎng)?chuàng)建新的block文件,也就需要同時(shí)創(chuàng)建新的meta文件藤滥,并寫(xiě)入文件頭
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
}
} catch (ReplicaAlreadyExistsException bae) {
throw bae;
} catch (ReplicaNotFoundException bne) {
throw bne;
} catch(IOException ioe) {
...// IOE通常涉及文件等資源鳖粟,因此要額外清理資源
}
}
盡管上述代碼的注釋加了不少,但創(chuàng)建block的場(chǎng)景比較簡(jiǎn)單拙绊,只需要記住在rbw目錄下創(chuàng)建block文件和meta文件即可牺弹。
在rbw目錄下創(chuàng)建數(shù)據(jù)塊后,還要通過(guò)DataNode#notifyNamenodeReceivingBlock()向namenode匯報(bào)正在接收的數(shù)據(jù)塊时呀。該方法僅僅將數(shù)據(jù)塊放入緩沖區(qū)中张漂,由BPServiceActor線程異步匯報(bào)。
此處不展開(kāi)谨娜,后面會(huì)介紹一個(gè)相似的方法DataNode#notifyNamenodeReceivedBlock()航攒。
接收數(shù)據(jù)塊:BlockReceiver#receiveBlock()
BlockReceiver#receiveBlock():
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
...// 參數(shù)設(shè)置
try {
// 如果是客戶端發(fā)起的寫(xiě)請(qǐng)求(此處即為數(shù)據(jù)塊create),則啟動(dòng)PacketResponder發(fā)送ack
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
// 同步接收packet趴梢,寫(xiě)block文件和meta文件
while (receivePacket() >= 0) {}
// 此時(shí)漠畜,節(jié)點(diǎn)已接收了所有packet,可以等待發(fā)送完所有ack后關(guān)閉responder
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
...// 數(shù)據(jù)塊復(fù)制相關(guān)
} catch (IOException ioe) {
if (datanode.isRestarting()) {
LOG.info("Shutting down for restart (" + block + ").");
} else {
LOG.info("Exception for " + block, ioe);
throw ioe;
}
} finally {
...// 清理
}
}
同步接收packet:BlockReceiver#receivePacket()
先看BlockReceiver#receivePacket()坞靶。
嚴(yán)格來(lái)說(shuō)憔狞,BlockReceiver#receivePacket()負(fù)責(zé)接收上游的packet,并繼續(xù)向下游節(jié)點(diǎn)管道寫(xiě):
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);
PacketHeader header = packetReceiver.getHeader();
...// 略
...// 檢查packet頭
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
...// 略
// 如果不需要立即持久化也不需要校驗(yàn)收到的數(shù)據(jù)彰阴,則可以立即委托PacketResponder線程返回 SUCCESS 的ack瘾敢,然后再進(jìn)行校驗(yàn)和持久化
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
...// 管道寫(xiě)相關(guān)
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
if (lastPacketInBlock || len == 0) { // 收到空packet可能是表示心跳或數(shù)據(jù)塊發(fā)送
// 這兩種情況都可以嘗試把之前的數(shù)據(jù)刷到磁盤(pán)
if (syncBlock) {
flushOrSync(true);
}
} else { // 否則,需要持久化packet
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();
...// 如果是管道中的最后一個(gè)節(jié)點(diǎn)尿这,則持久化之前簇抵,要先對(duì)收到的packet做一次校驗(yàn)(使用packet本身的校驗(yàn)機(jī)制)
...// 如果校驗(yàn)錯(cuò)誤,則委托PacketResponder線程返回 ERROR_CHECKSUM 的ack
final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
...// 如果校驗(yàn)塊不完整射众,需要加載并調(diào)整舊的meta文件內(nèi)容碟摆,供后續(xù)重新計(jì)算crc
// 寫(xiě)block文件
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
// 寫(xiě)meta文件
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
} else if (partialCrc != null) { // 如果是校驗(yàn)塊不完整(之前收到過(guò)一部分)
...// 重新計(jì)算crc
...// 更新lastCrc
checksumOut.write(buf);
partialCrc = null;
} else { // 如果校驗(yàn)塊完整
...// 更新lastCrc
checksumOut.write(checksumBuf.array(), offset, checksumLen);
}
...//略
}
} catch (IOException iex) {
datanode.checkDiskErrorAsync();
throw iex;
}
}
// 相反的,如果需要立即持久化或需要校驗(yàn)收到的數(shù)據(jù)叨橱,則現(xiàn)在已經(jīng)完成了持久化和校驗(yàn)典蜕,可以委托PacketResponder線程返回 SUCCESS 的ack
// if sync was requested, put in queue for pending acks here
// (after the fsync finished)
if (responder != null && (syncBlock || shouldVerifyChecksum())) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
...// 如果超過(guò)了響應(yīng)時(shí)間断盛,還要主動(dòng)發(fā)送一個(gè)IN_PROGRESS的ack,防止超時(shí)
...// 節(jié)流器相關(guān)
// 當(dāng)整個(gè)數(shù)據(jù)塊都發(fā)送完成之前愉舔,客戶端會(huì)可能會(huì)發(fā)送有數(shù)據(jù)的packet钢猛,也因?yàn)榫S持心跳或表示結(jié)束寫(xiě)數(shù)據(jù)塊發(fā)送空packet
// 因此,當(dāng)標(biāo)志位lastPacketInBlock為true時(shí)屑宠,不能返回0厢洞,要返回一個(gè)負(fù)值,以區(qū)分未到達(dá)最后一個(gè)packet之前的情況
return lastPacketInBlock?-1:len;
}
...
private boolean shouldVerifyChecksum() {
// 對(duì)于客戶端寫(xiě)典奉,只有管道中的最后一個(gè)節(jié)點(diǎn)滿足`mirrorOut == null`
return (mirrorOut == null || isDatanode || needsChecksumTranslation);
}
BlockReceiver#shouldVerifyChecksum()主要與管道寫(xiě)有關(guān)躺翻,本文只有一個(gè)datanode,則一定滿足
mirrorOut == null
卫玖。
上述代碼看起來(lái)長(zhǎng)公你,主要工作只有四項(xiàng):
- 接收packet
- 校驗(yàn)packet
- 持久化packet
- 委托PacketResponder線程發(fā)送ack
BlockReceiver#receivePacket() + PacketResponder線程 + PacketResponder#ackQueue構(gòu)成一個(gè)生產(chǎn)者消費(fèi)者模型。生產(chǎn)和消費(fèi)的對(duì)象是ack假瞬,BlockReceiver#receivePacket()是生產(chǎn)者陕靠,PacketResponder線程是消費(fèi)者。
掃一眼PacketResponder#enqueue():
void enqueue(final long seqno, final boolean lastPacketInBlock,
final long offsetInBlock, final Status ackStatus) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
System.nanoTime(), ackStatus);
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
synchronized(ackQueue) {
if (running) {
ackQueue.addLast(p);
ackQueue.notifyAll();
}
}
}
ackQueue是一個(gè)線程不安全的LinkedList脱茉。
關(guān)于如何利用線程不安全的容器實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型可參考Java實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模型中的實(shí)現(xiàn)三剪芥。
異步發(fā)送ack:PacketResponder線程
與BlockReceiver#receivePacket()相對(duì),PacketResponder線程負(fù)責(zé)接收下游節(jié)點(diǎn)的ack琴许,并繼續(xù)向上游管道響應(yīng)税肪。
PacketResponder#run():
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
// 如果當(dāng)前節(jié)點(diǎn)不是管道的最后一個(gè)節(jié)點(diǎn),且下游節(jié)點(diǎn)正常榜田,則從下游讀取ack
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
ack.readFields(downstreamIn);
...// 統(tǒng)計(jì)相關(guān)
...// OOB相關(guān)(暫時(shí)忽略)
seqno = ack.getSeqno();
}
// 如果從下游節(jié)點(diǎn)收到了正常的 ack益兄,或當(dāng)前節(jié)點(diǎn)是管道的最后一個(gè)節(jié)點(diǎn),則需要從隊(duì)列中消費(fèi)pkt(即BlockReceiver#receivePacket()放入的ack)
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
// 管道寫(xiě)用seqno控制packet的順序:當(dāng)且僅當(dāng)下游正確接收的序號(hào)與當(dāng)前節(jié)點(diǎn)正確處理完的序號(hào)相等時(shí)箭券,當(dāng)前節(jié)點(diǎn)才認(rèn)為該序號(hào)的packet已正確接收净捅;上游同理
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
...// 統(tǒng)計(jì)相關(guān)
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
...// 異常處理
} catch (IOException ioe) {
...// 異常處理
}
...// 中斷退出
// 如果是最后一個(gè)packet,將block的狀態(tài)轉(zhuǎn)換為FINALIZED辩块,并關(guān)閉BlockReceiver
if (lastPacketInBlock) {
finalizeBlock(startTime);
}
// 此時(shí)蛔六,必然滿足 ack.seqno == pkt.seqno,構(gòu)造新的 ack 發(fā)送給上游
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
(pkt != null ? pkt.ackStatus : Status.SUCCESS));
// 已經(jīng)處理完隊(duì)頭元素庆捺,出隊(duì)
// 只有一種情況下滿足pkt == null:PacketResponder#isRunning()返回false古今,即PacketResponder線程正在關(guān)閉。此時(shí)無(wú)論隊(duì)列中是否有元素滔以,都不需要出隊(duì)了
if (pkt != null) {
removeAckHead();
}
} catch (IOException e) {
...// 異常處理
} catch (Throwable e) {
...// 異常處理
}
}
LOG.info(myString + " terminating");
}
總結(jié)起來(lái),PacketResponder線程的核心工作如下:
- 接收下游節(jié)點(diǎn)的ack
- 比較ack.seqno與當(dāng)前隊(duì)頭的pkt.seqno
- 如果相等氓拼,則向上游發(fā)送pkt
- 如果是最后一個(gè)packet你画,將block的狀態(tài)轉(zhuǎn)換為FINALIZED
一不小心把管道響應(yīng)的邏輯也分析了抵碟。。坏匪。
掃一眼PacketResponder線程使用的出隊(duì)和查看對(duì)頭的方法:
// 查看隊(duì)頭
Packet waitForAckHead(long seqno) throws InterruptedException {
synchronized(ackQueue) {
while (isRunning() && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
ackQueue.wait();
}
return isRunning() ? ackQueue.getFirst() : null;
}
}
...
// 出隊(duì)
private void removeAckHead() {
synchronized(ackQueue) {
ackQueue.removeFirst();
ackQueue.notifyAll();
}
}
隊(duì)尾入隊(duì)拟逮,隊(duì)頭出隊(duì)。
- 每次查看對(duì)頭后适滓,如果發(fā)現(xiàn)隊(duì)列非空敦迄,則只要不出隊(duì),則隊(duì)列后續(xù)狀態(tài)一定是非空的凭迹,且隊(duì)頭元素不變罚屋。
- 查看隊(duì)頭后的第一次出隊(duì),彈出的一定是剛才查看隊(duì)頭看到的元素嗅绸。
需要看下PacketResponder#finalizeBlock():
private void finalizeBlock(long startTime) throws IOException {
// 關(guān)閉BlockReceiver脾猛,并清理資源
BlockReceiver.this.close();
...// log
block.setNumBytes(replicaInfo.getNumBytes());
// datanode上的數(shù)據(jù)塊關(guān)閉委托給FsDatasetImpl#finalizeBlock()
datanode.data.finalizeBlock(block);
// namenode上的數(shù)據(jù)塊關(guān)閉委托給Datanode#closeBlock()
datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
...// log
}
datanode角度的數(shù)據(jù)塊關(guān)閉:FsDatasetImpl#finalizeBlock()
FsDatasetImpl#finalizeBlock():
public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
}
ReplicaInfo replicaInfo = getReplicaInfo(b);
if (replicaInfo.getState() == ReplicaState.FINALIZED) {
// this is legal, when recovery happens on a file that has
// been opened for append but never modified
return;
}
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
...
private synchronized FinalizedReplica finalizeReplica(String bpid,
ReplicaInfo replicaInfo) throws IOException {
FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() ==
ReplicaState.FINALIZED) { // 數(shù)據(jù)塊恢復(fù)相關(guān)(略)
newReplicaInfo = (FinalizedReplica)
((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
} else {
FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
// 回憶BlockReceiver.<init>()的分析,我們創(chuàng)建的block處于RBW狀態(tài)鱼鸠,block文件位于rbw目錄(當(dāng)然猛拴,實(shí)際上位于哪里也無(wú)所謂,原因見(jiàn)后)
File f = replicaInfo.getBlockFile();
if (v == null) {
throw new IOException("No volume for temporary file " + f +
" for block " + replicaInfo);
}
// 在卷FsVolumeImpl上進(jìn)行block文件與meta文件的狀態(tài)轉(zhuǎn)換
File dest = v.addFinalizedBlock(
bpid, replicaInfo, f, replicaInfo.getBytesReserved());
// 該副本即代表最終的數(shù)據(jù)塊副本蚀狰,處于FINALIZED狀態(tài)
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
...// 略
}
volumeMap.add(bpid, newReplicaInfo);
return newReplicaInfo;
}
FsVolumeImpl#addFinalizedBlock():
File addFinalizedBlock(String bpid, Block b,
File f, long bytesReservedForRbw)
throws IOException {
releaseReservedSpace(bytesReservedForRbw);
return getBlockPoolSlice(bpid).addBlock(b, f);
}
還記得datanode啟動(dòng)過(guò)程中分析的FsVolumeImpl與BlockPoolSlice的關(guān)系嗎愉昆?此處將操作繼續(xù)委托給BlockPoolSlice#addBlock():
可知,BlockPoolSlice僅管理處于FINALIZED的數(shù)據(jù)塊麻蹋。
File addBlock(Block b, File f) throws IOException {
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
if (!blockDir.exists()) {
if (!blockDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + blockDir);
}
}
File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
...// 統(tǒng)計(jì)相關(guān)
return blockFile;
}
BlockPoolSlice反向借助FsDatasetImpl提供的靜態(tài)方法FsDatasetImpl.moveBlockFiles():
static File moveBlockFiles(Block b, File srcfile, File destdir)
throws IOException {
final File dstfile = new File(destdir, b.getBlockName());
final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
try {
NativeIO.renameTo(srcmeta, dstmeta);
} catch (IOException e) {
throw new IOException("Failed to move meta file for " + b
+ " from " + srcmeta + " to " + dstmeta, e);
}
try {
NativeIO.renameTo(srcfile, dstfile);
} catch (IOException e) {
throw new IOException("Failed to move block file for " + b
+ " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
}
...// 日志
return dstfile;
}
直接將block文件和meta文件從原目錄(rbw目錄跛溉,對(duì)應(yīng)RBW狀態(tài))移動(dòng)到finalized目錄(對(duì)應(yīng)FINALIZED狀態(tài))。
至此哥蔚,datanode上的寫(xiě)數(shù)據(jù)塊已經(jīng)完成倒谷。
不過(guò),namenode上的元信息還沒(méi)有更新糙箍,因此渤愁,還要向namenode匯報(bào)收到了數(shù)據(jù)塊。
- 線程安全由FsDatasetImpl#finalizeReplica()保證
- 整個(gè)FsDatasetImpl#finalizeReplica()的流程中深夯,都不關(guān)系數(shù)據(jù)塊的原位置抖格,狀態(tài)轉(zhuǎn)換邏輯本身保證了其正確性。
namenode角度的數(shù)據(jù)塊關(guān)閉:Datanode#closeBlock()
Datanode#closeBlock():
void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
metrics.incrBlocksWritten();
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
// 向namenode匯報(bào)已收到的數(shù)據(jù)塊
bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid);
} else {
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
// 將新數(shù)據(jù)塊添加到blockScanner的掃描范圍中(暫不討論)
FsVolumeSpi volume = getFSDataset().getVolume(block);
if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block);
}
}
BPOfferService#notifyNamenodeReceivedBlock():
void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block);
// 收到數(shù)據(jù)塊(增加)與刪除數(shù)據(jù)塊(減少)是一起匯報(bào)的咕晋,都構(gòu)造為ReceivedDeletedBlockInfo
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
delHint);
// 每個(gè)BPServiceActor都要向自己負(fù)責(zé)的namenode發(fā)送報(bào)告
for (BPServiceActor actor : bpServices) {
actor.notifyNamenodeBlock(bInfo, storageUuid, true);
}
}
BPServiceActor#notifyNamenodeBlock():
void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo,
String storageUuid, boolean now) {
synchronized (pendingIncrementalBRperStorage) {
// 更新pendingIncrementalBRperStorage
addPendingReplicationBlockInfo(
bInfo, dn.getFSDataset().getStorage(storageUuid));
// sendImmediateIBR是一個(gè)volatile變量雹拄,控制是否立即發(fā)送BlockReport(BR)
sendImmediateIBR = true;
// 傳入的now為true,接下來(lái)將喚醒阻塞在pendingIncrementalBRperStorage上的所有線程
if (now) {
pendingIncrementalBRperStorage.notifyAll();
}
}
}
該方法的核心是pendingIncrementalBRperStorage掌呜,它維護(hù)了兩次匯報(bào)之間收到滓玖、刪除的數(shù)據(jù)塊。pendingIncrementalBRperStorage是一個(gè)緩沖區(qū)质蕉,此處將收到的數(shù)據(jù)塊放入緩沖區(qū)后即認(rèn)為通知完成(當(dāng)然势篡,不一定成功)翩肌;由其他線程讀取緩沖區(qū),異步向namenode匯報(bào)禁悠。
猴子看的源碼比較少念祭,但這種緩沖區(qū)的設(shè)計(jì)思想在HDFS和Yarn中非常常見(jiàn)。緩沖區(qū)實(shí)現(xiàn)了解耦碍侦,解耦不僅能提高可擴(kuò)展性粱坤,還能在緩沖區(qū)兩端使用不同的處理速度、處理規(guī)模瓷产。如pendingIncrementalBRperStorage站玄,生產(chǎn)者不定期、零散放入的數(shù)據(jù)塊拦英,消費(fèi)者就可以定期蜒什、批量的對(duì)數(shù)據(jù)塊進(jìn)行處理。而保障一定及時(shí)性的前提下疤估,批量匯報(bào)減輕了RPC的壓力灾常。
利用IDE,很容易得知铃拇,只有負(fù)責(zé)向各namenode發(fā)送心跳的BPServiceActor線程阻塞在pendingIncrementalBRperStorage上钞瀑。后文將分析該線程如何進(jìn)行實(shí)際的匯報(bào)。
PacketResponder#close()
根據(jù)對(duì)BlockReceiver#receivePacket()與PacketResponder線程的分析慷荔,節(jié)點(diǎn)已接收所有packet時(shí)雕什,ack可能還沒(méi)有發(fā)送完。
因此显晶,需要調(diào)用PacketResponder#close()贷岸,等待發(fā)送完所有ack后關(guān)閉responder:
public void close() {
synchronized(ackQueue) {
// ackQueue非空就說(shuō)明ack還沒(méi)有發(fā)送完成
while (isRunning() && ackQueue.size() != 0) {
try {
ackQueue.wait();
} catch (InterruptedException e) {
running = false;
Thread.currentThread().interrupt();
}
}
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": closing");
}
// notify阻塞在PacketResponder#waitForAckHead()方法上的PacketResponder線程,使其檢測(cè)到關(guān)閉條件
running = false;
ackQueue.notifyAll();
}
// ???
synchronized(this) {
running = false;
notifyAll();
}
}
猴子沒(méi)明白19-22行的synchronized語(yǔ)句塊有什么用磷雇,偿警,,求解釋唯笙。
BPServiceActor線程
根據(jù)前文螟蒸,接下來(lái)需要分析BPServiceActor線程如何讀取pendingIncrementalBRperStorage緩沖區(qū),進(jìn)行實(shí)際的匯報(bào)崩掘。
在BPServiceActor#offerService()中調(diào)用了pendingIncrementalBRperStorage#wait()七嫌。由于涉及阻塞、喚醒等操作苞慢,無(wú)法按照正常流程分析诵原,這里從線程被喚醒的位置開(kāi)始分析:
// 如果目前不需要匯報(bào),則wait一段時(shí)間
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
try {
// BPServiceActor線程從此處醒來(lái),然后退出synchronized塊
pendingIncrementalBRperStorage.wait(waitTime);
} catch (InterruptedException ie) {
LOG.warn("BPOfferService for " + this + " interrupted");
}
}
} // synchronized
可能有讀者閱讀過(guò)猴子的條件隊(duì)列大法好:使用wait皮假、notify和notifyAll的正確姿勢(shì)鞋拟,認(rèn)為此處
if(){wait}
的寫(xiě)法姿勢(shì)不正確骂维。讀者可再?gòu)?fù)習(xí)一下該文的“version2:過(guò)早喚醒”部分惹资,結(jié)合HDFS的心跳機(jī)制,思考一下為什么此處的寫(xiě)法沒(méi)有問(wèn)題航闺。更甚褪测,此處恰恰應(yīng)當(dāng)這么寫(xiě)。
如果目前不需要匯報(bào)潦刃,則BPServiceActor線程會(huì)wait一段時(shí)間侮措,正式這段wait的時(shí)間,讓BPServiceActor#notifyNamenodeBlock()的喚醒產(chǎn)生了意義乖杠。
BPServiceActor線程喚醒后分扎,醒來(lái)后,繼續(xù)心跳循環(huán):
while (shouldRun()) {
try {
final long startTime = now();
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
假設(shè)還到達(dá)心跳發(fā)送間隔胧洒,則不執(zhí)行if語(yǔ)句塊畏吓。
此時(shí),在BPServiceActor#notifyNamenodeBlock()方法中修改的volatile變量sendImmediateIBR就派上了用場(chǎng):
// 檢測(cè)到sendImmediateIBR為true卫漫,則立即匯報(bào)已收到和已刪除的數(shù)據(jù)塊
if (sendImmediateIBR ||
(startTime - lastDeletedReport > dnConf.deleteReportInterval)) {
// 匯報(bào)已收到和已刪除的數(shù)據(jù)塊
reportReceivedDeletedBlocks();
// 更新lastDeletedReport
lastDeletedReport = startTime;
}
// 再來(lái)一次完整的數(shù)據(jù)塊匯報(bào)
List<DatanodeCommand> cmds = blockReport();
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
// 處理namenode返回的命令
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
有意思的是菲饼,這里先單獨(dú)匯報(bào)了一次數(shù)據(jù)塊收到和刪除的情況,該RPC不需要等待namenode的返回值列赎;又匯報(bào)了一次總體情況宏悦,此時(shí)需要等待RPC的返回值了。
因此包吝,盡管對(duì)于增刪數(shù)據(jù)塊采取增量式匯報(bào)饼煞,但由于增量式匯報(bào)后必然跟著一次全量匯報(bào),使得增量匯報(bào)的成本仍然非常高诗越。為了提高并發(fā)砖瞧,BPServiceActor#notifyNamenodeBlock修改緩沖區(qū)后立即返回,不關(guān)心匯報(bào)是否成功掺喻。也不必?fù)?dān)心匯報(bào)失敗的后果:在匯報(bào)之前芭届,數(shù)據(jù)塊已經(jīng)轉(zhuǎn)為FINALIZED狀態(tài)+持久化到磁盤(pán)上+修改了緩沖區(qū),如果匯報(bào)失敗可以等待重試感耙,如果datanode在發(fā)報(bào)告前掛了可以等啟動(dòng)后重新匯報(bào)褂乍,必然能保證一致性。
暫時(shí)不關(guān)心總體匯報(bào)的邏輯即硼,只看單獨(dú)匯報(bào)的BPServiceActor#reportReceivedDeletedBlocks():
private void reportReceivedDeletedBlocks() throws IOException {
// 構(gòu)造報(bào)告逃片,并重置sendImmediateIBR為false
ArrayList<StorageReceivedDeletedBlocks> reports =
new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
synchronized (pendingIncrementalBRperStorage) {
for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
pendingIncrementalBRperStorage.entrySet()) {
final DatanodeStorage storage = entry.getKey();
final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
if (perStorageMap.getBlockInfoCount() > 0) {
ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
}
}
sendImmediateIBR = false;
}
// 如果報(bào)告為空,就直接返回
if (reports.size() == 0) {
return;
}
// 否則通過(guò)RPC向自己負(fù)責(zé)的namenode發(fā)送報(bào)告
boolean success = false;
try {
bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpos.getBlockPoolId(),
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
success = true;
} finally {
// 如果匯報(bào)失敗,則將增刪數(shù)據(jù)塊的信息放回緩沖區(qū)褥实,等待重新匯報(bào)
if (!success) {
synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
PerStoragePendingIncrementalBR perStorageMap =
pendingIncrementalBRperStorage.get(report.getStorage());
perStorageMap.putMissingBlockInfos(report.getBlocks());
sendImmediateIBR = true;
}
}
}
}
}
有兩個(gè)注意點(diǎn):
- 不管namenode處于active或standy狀態(tài)呀狼,BPServiceActor線程都會(huì)匯報(bào)(盡管會(huì)忽略standby namenode的命令)
- 最后success為false時(shí),可能namenode已收到匯報(bào)损离,但將信息添加會(huì)緩沖區(qū)導(dǎo)致重復(fù)匯報(bào)也沒(méi)有壞影響哥艇,這分為兩個(gè)方面:
- 重復(fù)匯報(bào)已刪除的數(shù)據(jù)塊:namenode發(fā)現(xiàn)未存儲(chǔ)該數(shù)據(jù)塊的信息,則得知其已經(jīng)刪除了僻澎,會(huì)忽略該信息貌踏。
- 重復(fù)匯報(bào)已收到的數(shù)據(jù)塊:namenode發(fā)現(xiàn)新收到的數(shù)據(jù)塊與已存儲(chǔ)數(shù)據(jù)塊的信息完全一致,也會(huì)忽略該信息窟勃。
總結(jié)
1個(gè)客戶端+1個(gè)datanode構(gòu)成了最小的管道祖乳。本文梳理了在這個(gè)最小管道上無(wú)異常情況下的寫(xiě)數(shù)據(jù)塊過(guò)程,在此之上秉氧,再來(lái)分析管道寫(xiě)的有異常的難度將大大降低眷昆。
本文鏈接:源碼|HDFS之DataNode:寫(xiě)數(shù)據(jù)塊(1)
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識(shí)共享署名-相同方式共享 4.0 國(guó)際許可協(xié)議發(fā)布,歡迎轉(zhuǎn)載汁咏,演繹或用于商業(yè)目的亚斋,但是必須保留本文的署名及鏈接。