本文包含如下內(nèi)容:
① Pipeline的建立過程街望,以及下游節(jié)點(diǎn)如何給上游節(jié)點(diǎn)發(fā)Ack
② DFSOutputStream榆骚、DataStreamer的原理
③ Sender、BlockReceiver闰蚕、PacketResponder的原理
作為引子两踏,先從最上游談起:
我們使用HDFS API創(chuàng)建文件,寫文件時艰垂,首先會調(diào)用FileSystem的create方法泡仗,獲得一個FSDataOutputStream流,然后通過用這個流來write數(shù)據(jù)即可猜憎。
別看API這么簡單娩怎,這后面發(fā)生的事情可是十分復(fù)雜!比如這后面涉及到Client通過RPC調(diào)用在NameNode側(cè)的文件系統(tǒng)目錄中創(chuàng)建文件拉宗、addBlock峦树,NameNode給客戶端返回文件的LocatedBlock、客戶端根據(jù)LocatedBlock創(chuàng)建輸出流旦事、Datanode建立Pipeline魁巩、Client發(fā)送Packet到Pipeline,Sender發(fā)送數(shù)據(jù)到DataXeicver等等姐浮。
接下來讓我們一起來深入探索吧谷遂!
一、寫數(shù)據(jù)Pipeline總覽
我們跳過客戶端調(diào)用RPC添加數(shù)據(jù)塊卖鲤,NameNode側(cè)為數(shù)據(jù)塊選擇最優(yōu)存放的DataNode列表這一步驟的講解肾扰,直接來到已經(jīng)選好DataNode列表后畴嘶,客戶端向列表中的DataNode寫數(shù)據(jù)的步驟。
這個步驟需要客戶端與DataNode列表之間建立一個Pipeline集晚。如下圖所示:
數(shù)據(jù)是以DFSPacket對象的形式封裝的窗悯,一個Block可能由很多個Packet組成,Packet的具體格式參照HDFS源碼中的DFSPacket.java的注釋偷拔。
Client發(fā)送Packet給Pipeline中的第一個DataNode A蒋院,A收到數(shù)據(jù)后轉(zhuǎn)發(fā)給B,B再轉(zhuǎn)發(fā)給C莲绰。 當(dāng)Pipeline中的最下游節(jié)點(diǎn)收到數(shù)據(jù)包后欺旧,會按照數(shù)據(jù)包傳送方向的反方向發(fā)送對數(shù)據(jù)包的Ack信息。這個Ack信息是一個數(shù)據(jù)包的序列號蛤签,在Client側(cè)是單調(diào)遞增的辞友。上圖是從單個數(shù)據(jù)包的視角出發(fā)。我們來從多個數(shù)據(jù)包的視角出發(fā)來看震肮,下圖引自雅虎實(shí)驗(yàn)室的HDFS論文:
OK称龙,從整體上看,整個寫pipeline的過程很容易理解戳晌,但這里面的代碼量非常多茵瀑,涉及到Client側(cè)、DataNode側(cè)的很多線程類躬厌。后文中,我們將逐一擊破竞帽。
二扛施、DFSOutputStream的獲取
首先來看看DFSClient中如何獲取寫數(shù)據(jù)的流:DFSOutputStream
/**
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
* Progressable, int, ChecksumOpt, InetSocketAddress[])} with the addition of
* ecPolicyName that is used to specify a specific erasure coding policy
* instead of inheriting any policy from this new file's parent directory.
* This policy will be persisted in HDFS. A value of null means inheriting
* parent groups' whatever policy.
*/
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException {
checkOpen();
final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName);
beginFileLease(result.getFileId(), result);
return result;
}
通過DFSOutputStream#newStreamForCreate方法獲取到一個DFSOutputStream流對象。參數(shù)比較多屹篓,關(guān)注一下this代表DFSClient對象疙渣,還有favoredNodes列表代表優(yōu)先選擇的DataNode。
在newStreamForCreate方法中主要做兩個工作:
①調(diào)用Namenode代理對象上的create方法堆巧,在NameNode文件系統(tǒng)中創(chuàng)建一個文件妄荔,并獲得此文件的元數(shù)據(jù)信息HdfsFileStatus對象。
②將①中的HdfsFileStatus對象傳入DFSOutputStream構(gòu)造方法中谍肤,構(gòu)造出一個DFSOutputStream流對象啦租,同時啟動DFSOutputStream對象中的DataStreamer線程。
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress,
DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
throws IOException {
try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate", src)) {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
break;
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
AccessControlException.class,
DSQuotaExceededException.class,
QuotaByStorageTypeExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
RetryStartFileException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else {
throw e;
}
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
// 構(gòu)造一個DFSOutputStream對象
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
// 啟動DataStreamer線程
out.start();
return out;
}
}
那我們就一路跟蹤DFSOutputStream的構(gòu)造方法吧:
也是主要做了兩件事:
①設(shè)置一下DFSOutputStream的成員變量的值荒揣,比如對應(yīng)的fileId篷角、src、對應(yīng)的DFSClient對象系任、數(shù)據(jù)packet大小等等
②創(chuàng)建DataStreamer線程(外層去start這個線程)
/** Construct a new output stream for creating a file. */
protected DFSOutputStream(DFSClient dfsClient, String src,
HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
this(dfsClient, src, flag, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
if (createStreamer) {
streamer = new DataStreamer(stat, null, dfsClient, src, progress,
checksum, cachingStrategy, byteArrayManager, favoredNodes,
addBlockFlags);
}
}
OK恳蹲,到這里似乎斷線了虐块?不知道接下來改怎么看了?DataStreamer線程的run方法啊嘉蕾。為了讓行文邏輯更順暢贺奠,我進(jìn)行了段落的重排,下面的輸出流write數(shù)據(jù)這塊代碼是我DataStreamer源碼后回來寫的错忱。
DataStreamer線程從dataQueue中提取數(shù)據(jù)包儡率,并將數(shù)據(jù)包發(fā)送到將管道中的第一個datanode,然后把這個數(shù)據(jù)包從dataQueue移動到ackQueue航背。那我們不就得看看數(shù)據(jù)包是怎么添加到dataQueue中的喉悴。全局代碼搜索dataQueue.add,發(fā)現(xiàn)dataQueue.addLast是我們想要的方法玖媚。通過查看調(diào)用關(guān)系:
最后你會發(fā)現(xiàn):是在調(diào)用FSOutputSummer#write方法時箕肃,才會將數(shù)據(jù)封裝成DFSPacket對象入到dataQueue里等待DataStreamer去發(fā)送給DataNode的。
而DFSOutputStream類恰恰就是FSOutputSummer的子類今魔,且沒有重寫write方法勺像。也就是說我們最后的write方法是走FSOutputSummer的write方法,也就會把數(shù)據(jù)封裝成DFSPacket對象入隊到dataQueue中了错森。
notes
①我們在使用HDFS API的時候吟宦,雖然流對象是FSDataOutputStream類型的,但是本質(zhì)上底層的流還是DFSOutputStream的涩维,F(xiàn)SDataOutputStream這個流是個包裝流殃姓,是DFSClient#createWrappedOutputStream方法中把DFSOutputStream類包裝成了FSDataOutputStream類。
②構(gòu)造流的時候瓦阐,我們根據(jù)配置參數(shù)獲得到max packet的大小蜗侈,然后再write數(shù)據(jù)的時候,由于流是FSOutputSummer的子類睡蟋,會記錄當(dāng)前已經(jīng)寫了的字節(jié)數(shù)踏幻,如果達(dá)到配置設(shè)置的最大上限,那么就構(gòu)造DFSPacket戳杀,入隊到dataQueue中该面,等待DataStreamer線程進(jìn)行處理。
三、DataStream類
DataStreamer是Client寫數(shù)據(jù)的核心類。它本質(zhì)上是個Thread類闻镶。它的主要功能在JavaDoc中描述的很詳細(xì),這里摘錄一下:
/*********************************************************************
*
* The DataStreamer class is responsible for sending data packets to the
* datanodes in the pipeline. It retrieves a new blockid and block locations
* from the namenode, and starts streaming packets to the pipeline of
* Datanodes. Every packet has a sequence number associated with
* it. When all the packets for a block are sent out and acks for each
* if them are received, the DataStreamer closes the current block.
*
* The DataStreamer thread picks up packets from the dataQueue, sends it to
* the first datanode in the pipeline and moves it from the dataQueue to the
* ackQueue. The ResponseProcessor receives acks from the datanodes. When an
* successful ack for a packet is received from all datanodes, the
* ResponseProcessor removes the corresponding packet from the ackQueue.
*
* In case of error, all outstanding packets are moved from ackQueue. A new
* pipeline is setup by eliminating the bad datanode from the original
* pipeline. The DataStreamer now starts sending packets from the dataQueue.
*
*********************************************************************/
翻譯一下蚕泽,什么tmd叫tmd驚喜:
DataStreamer類負(fù)責(zé)給在pipeline中的datanode發(fā)送數(shù)據(jù)包。它從namenode檢索一個新的blockid和block位置,并開始把數(shù)據(jù)包以流的形式發(fā)送到datanodes組成的pipeline中须妻。每個數(shù)據(jù)包都有一個與之相關(guān)聯(lián)的序列號仔蝌。當(dāng)一個塊的所有包都被發(fā)送出去并且每個包的ack信息也都被收到,則DataStreamer會關(guān)閉當(dāng)前塊荒吏。
DataStreamer線程從dataQueue中提取數(shù)據(jù)包敛惊,并將數(shù)據(jù)包發(fā)送到將管道中的第一個datanode,然后把這個數(shù)據(jù)包從dataQueue移動到ackQueue绰更。ResponseProcessor線程接收來自datanode的響應(yīng)瞧挤。當(dāng)一個數(shù)據(jù)包的成功的ack信息從所有datanode發(fā)送過來時,ResponseProcessor會從ackQueue中移除相應(yīng)的數(shù)據(jù)包儡湾。
在出現(xiàn)錯誤的情況下特恬,所有未完成的數(shù)據(jù)包將從ackQueue中被移除。接著在原來的出錯的pipeline中消除掉bad datanode的基礎(chǔ)上構(gòu)建一個新的pipeline徐钠。DataStreamer再繼續(xù)開始發(fā)送dataQueue中的數(shù)據(jù)包癌刽。
OK,看完JavaDoc知道了DataStreamer的作用之后尝丐,來看下流程圖和一些關(guān)鍵源碼显拜。
從整體上看,DataStream#run方法的執(zhí)行邏輯如下圖:
它是一個線程類爹袁,那我們就看它的run方法就好了远荠。run方法很長,大概有200行代碼左右失息,讀者可以不用全看譬淳,只需關(guān)注我在代碼中注釋中拋出的問題即可,后面我會針對每個問題進(jìn)行解答:
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
*/
@Override
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (errorState.hasError()) {
closeResponder();
}
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError();
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
// ①halfSocketTimeout和timeout時間是怎么回事盹兢?
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) || doSleep) {
long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
LOG.warn("Caught exception", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
} else {
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.warn("Caught exception", e);
}
one = dataQueue.getFirst(); // regular data packet
SpanId[] parents = one.getTraceParents();
if (parents.length > 0) {
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0]);
scope.getSpan().setParents(parents);
}
}
}
// get new block from namenode.
if (LOG.isDebugEnabled()) {
LOG.debug("stage=" + stage + ", " + this);
}
// ②如何建立起Pipeline的瘦赫,Socket怎么創(chuàng)建的?
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
setPipeline(nextBlockOutputStream());
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" < lastByteOffsetInBlock, " + this + ", " + one);
}
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Caught exception", e);
}
}
}
if (shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
SpanId spanId = SpanId.INVALID;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
spanId = scope.getSpanId();
scope.detach();
one.setTraceScope(scope);
}
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll();
}
}
LOG.debug("{} sending {}", this, one);
// write out data to remote datanode
// 此處是寫蛤迎,調(diào)了flush刷流
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanId)) {
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (shouldStop()) {
continue;
}
// ③如果是block的最后一個packet,則等待ackQueue中的Packet都被移除(代表接收到了DataNode的響應(yīng))
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (!errorState.isRestartingNode()) {
// Since their messages are descriptive enough, do not always
// log a verbose stack-trace WARN for quota exceptions.
if (e instanceof QuotaExceededException) {
LOG.debug("DataStreamer Quota Exception", e);
} else {
LOG.warn("DataStreamer Exception", e);
}
}
lastException.set(e);
assert !(e instanceof NullPointerException);
errorState.setInternalError();
if (!errorState.isNodeMarked()) {
// Not a datanode issue
streamerClosed = true;
}
} finally {
if (scope != null) {
scope.close();
scope = null;
}
}
}
closeInternal();
}
OK含友,上面我們拋出了三個問題:
① wait的參數(shù)timeout時間是怎么確定的替裆?
為什么需要wait呢?看while的條件:
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) || doSleep)
dataQueue的size是0窘问,證明沒有數(shù)據(jù)需要發(fā)送辆童,因此不需要執(zhí)行后面的發(fā)送邏輯,所以線程可以wait進(jìn)入等待狀態(tài)惠赫。至于為什么要用halfSocketTimeout這個值把鉴,我覺得單純是HDFS這塊開發(fā)者做的一個trade-off,你也可以減小這個值,這樣無非就是多發(fā)送幾個heartbeat Packet而已庭砍。(而且场晶,最新的HDFS社區(qū)代碼這里已經(jīng)移除了wait halfSocketTimeout的邏輯)。
做個簡單的數(shù)學(xué)計算
long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
那如果timeout>0實(shí)際上結(jié)束wait的時間是:
now + timeout = now + halfSocketTimeout - now + lastPacket = lastPacket + halfSocketTimeout;
也即保證在上一個Packet包發(fā)送后怠缸,在wait至少halfSocketTimeout時長诗轻。
②如何構(gòu)建的DN網(wǎng)絡(luò)連接,Pipeline
這塊是重中之重揭北,以create文件為例:
這里有個細(xì)節(jié)扳炬,需要了解一下。就是假設(shè)數(shù)據(jù)塊存放的DataNode列表是:【A搔体、B恨樟、C】三臺。 Client首先和A節(jié)點(diǎn)建立Socket連接疚俱,然后A節(jié)點(diǎn)和B節(jié)點(diǎn)再建立Socket連接劝术,B節(jié)點(diǎn)再和C節(jié)點(diǎn)建立Socket連接。這里實(shí)現(xiàn)的方法如下:
在構(gòu)造參數(shù)的序列化對象時计螺,涉及到生成發(fā)送到下一步的目標(biāo)節(jié)點(diǎn)列表時夯尽,會調(diào)用PBHelperClient.convert方法,并傳入原始列表和startIndex=1登馒〕孜眨看這個convert方法的實(shí)現(xiàn),startIndex = 1的意思就是每次都從列表的第二個元素開始構(gòu)造新的target列表陈轿。舉個例子說明:開始是target DataNode列表是:【A圈纺、B、C】麦射,客戶端和A建立Socket連接時蛾娶,發(fā)送的target DataNode列表就是:【B、C】了潜秋,這樣A就可以和B建立連接蛔琅,以此類推 B -> C。
OK峻呛,到這里我們就知道Pipeline的整個構(gòu)造過程了罗售。
③ pipeline下游的DataNode怎么給上游發(fā)送ack的,以及ackQueue這個數(shù)據(jù)結(jié)構(gòu)相關(guān)的操作钩述。 這塊放到第三章講寨躁。
三、BlockReceiver & PacketResponder
上面提到過牙勘,DataStream線程從dataQueue隊列中取出待發(fā)送的DFSPacket對象時职恳,會把packet加入到ackQueue中,表示此Packet需要等待pipeline中的DataNode都返回ACK信息。那DataNode是如果給上游的DataNode以及Client返回Ack信息的呢放钦?下面我們就來看看色徘。
在DataStreamer中不是調(diào)用了new Sender(xxx).writeBlock方法么?這個東西會被DataNode側(cè)的DataXceiver類中的writeBlock方法處理最筒。writeBlock方法中又會委托給BlockReceiver#receiveBlock方法贺氓。receiveBlock方法中啟動了PacketResponder線程用來對接收到的packet進(jìn)行響應(yīng)。
觀察一下receiveBlock方法的參數(shù):
這三個流分別代表給下游datanode發(fā)送數(shù)據(jù)的輸出流床蜘、接收下游datanode數(shù)據(jù)的輸入流辙培、以及回復(fù)上游數(shù)據(jù)的輸出流⌒暇猓看一下PacketResponder的構(gòu)造函數(shù)扬蕊,將其中的兩個流傳入作為參數(shù)了:
type表示此responder是pipeline中的最后一個節(jié)點(diǎn)還是中間節(jié)點(diǎn)。
回到receiveBlock里丹擎,會有這樣一行代碼:
while (receivePacket() >= 0) { /* Receive until the last packet */ }
這是一個空循環(huán)尾抑,只要還有數(shù)據(jù)過來,那我就一直調(diào)用receivePacket方法蒂培。這個receivePacket方法返回的是接收的數(shù)據(jù)字節(jié)數(shù)再愈。里面會從輸入流中讀取Packet的各種信息,然后入隊护戳,等待packet線程后臺去處理翎冲,如下圖:
那接下來就是看PacketResponeder怎么給上游的DataNode或者Client返回響應(yīng)的吧。其實(shí)是在PacketResponeder線程的run方法中最終調(diào)用了sendAckUpstreamUnprotected方法給上游發(fā)送Ack媳荒,如下圖所示:
OK抗悍,到這里我們也就知道了DataNode是如何回復(fù)Ack信息的了。