Hadoop讀書筆記2-HDFS-read write file
HDFS是一個分布式文件系統(tǒng)田晚,在HDFS上寫文件的過程與我們平時使用的單機文件系統(tǒng)非常不同,從宏觀上來看国葬,在HDFS文件系統(tǒng)上創(chuàng)建并寫一個文件贤徒,流程如下圖(來自《Hadoop:The Definitive Guide》一書)所示:
(注意L:這是讀文件的示意圖 芹壕。 貼錯圖了)
具體過程描述如下:
- Client調(diào)用DistributedFileSystem對象的create方法,創(chuàng)建一個文件輸出流(FSDataOutputStream)對象
- 通過DistributedFileSystem對象與Hadoop集群的NameNode進行一次RPC遠程調(diào)用接奈,在HDFS的Namespace中創(chuàng)建一個文件條目(Entry)踢涌,該條目沒有任何的Block
- 通過FSDataOutputStream對象,向DataNode寫入數(shù)據(jù)序宦,數(shù)據(jù)首先被寫入FSDataOutputStream對象內(nèi)部的Buffer中睁壁,然后數(shù)據(jù)被分割成一個個Packet數(shù)據(jù)包
- 以Packet最小單位,基于Socket連接發(fā)送到按特定算法選擇的HDFS集群中一組DataNode(正常是3個互捌,可能大于等于1)中的一個節(jié)點上潘明,在這組DataNode組成的Pipeline上依次傳輸Packet
- 這組DataNode組成的Pipeline反方向上,發(fā)送ack疫剃,最終由Pipeline中第一個DataNode節(jié)點將Pipeline ack發(fā)送給Client
- 完成向文件寫入數(shù)據(jù)钉疫,Client在文件輸出流(FSDataOutputStream)對象上調(diào)用close方法,關(guān)閉流
- 調(diào)用DistributedFileSystem對象的complete方法巢价,通知NameNode文件寫入成功
下面代碼使用Hadoop的API來實現(xiàn)向HDFS的文件寫入數(shù)據(jù)牲阁,同樣也包括創(chuàng)建一個文件和寫數(shù)據(jù)兩個主要過程,代碼如下所示:
static String[] contents = new String[] {
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
"dddddddddddddddddddddddddddddddd",
"eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
};
public static void main(String[] args) {
String file = "hdfs://h1:8020/data/test/test.log";
Path path = new Path(file);
Configuration conf = new Configuration();
FileSystem fs = null;
FSDataOutputStream output = null;
try {
fs = path.getFileSystem(conf);
output = fs.create(path); // 創(chuàng)建文件
for(String line : contents) { // 寫入數(shù)據(jù)
output.write(line.getBytes("UTF-8"));
output.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
結(jié)合上面的示例代碼壤躲,我們先從fs.create(path);開始城菊,可以看到FileSystem的實現(xiàn)DistributedFileSystem中給出了最終返回FSDataOutputStream對象的抽象邏輯,代碼如下所示:
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream
(dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
}
下面碉克,我們從DFSOutputStream類開始凌唬,說明其內(nèi)部實現(xiàn)原理。
DFSOutputStream內(nèi)部原理
打開一個DFSOutputStream流漏麦,Client會寫數(shù)據(jù)到流內(nèi)部的一個緩沖區(qū)中客税,然后數(shù)據(jù)被分解成多個Packet,每個Packet大小為64k字節(jié)撕贞,每個Packet又由一組chunk和這組chunk對應(yīng)的checksum數(shù)據(jù)組成更耻,默認chunk大小為512字節(jié),每個checksum是對512字節(jié)數(shù)據(jù)計算的校驗和數(shù)據(jù)捏膨。
當Client寫入的字節(jié)流數(shù)據(jù)達到一個Packet的長度秧均,這個Packet會被構(gòu)建出來,然后會被放到隊列dataQueue中号涯,接著DataStreamer線程會不斷地從dataQueue隊列中取出Packet目胡,發(fā)送到復制Pipeline中的第一個DataNode上,并將該Packet從dataQueue隊列中移到ackQueue隊列中链快。ResponseProcessor線程接收從Datanode發(fā)送過來的ack誉己,如果是一個成功的ack,表示復制Pipeline中的所有Datanode都已經(jīng)接收到這個Packet久又,ResponseProcessor線程將packet從隊列ackQueue中刪除巫延。
在發(fā)送過程中效五,如果發(fā)生錯誤,所有未完成的Packet都會從ackQueue隊列中移除掉炉峰,然后重新創(chuàng)建一個新的Pipeline畏妖,排除掉出錯的那些DataNode節(jié)點,接著DataStreamer線程繼續(xù)從dataQueue隊列中發(fā)送Packet疼阔。
下面是DFSOutputStream的結(jié)構(gòu)及其原理戒劫,如圖所示:
我們從下面3個方面來描述內(nèi)部流程:
- 創(chuàng)建Packet
Client寫數(shù)據(jù)時,會將字節(jié)流數(shù)據(jù)緩存到內(nèi)部的緩沖區(qū)中婆廊,當長度滿足一個Chunk大醒赶浮(512B)時,便會創(chuàng)建一個Packet對象淘邻,然后向該Packet對象中寫Chunk Checksum校驗和數(shù)據(jù)茵典,以及實際數(shù)據(jù)塊Chunk Data,校驗和數(shù)據(jù)是基于實際數(shù)據(jù)塊計算得到的宾舅。每次滿足一個Chunk大小時统阿,都會向Packet中寫上述數(shù)據(jù)內(nèi)容,直到達到一個Packet對象大谐镂摇(64K)扶平,就會將該Packet對象放入到dataQueue隊列中,等待DataStreamer線程取出并發(fā)送到DataNode節(jié)點蔬蕊。
- 發(fā)送Packet
DataStreamer線程從dataQueue隊列中取出Packet對象结澄,放到ackQueue隊列中,然后向DataNode節(jié)點發(fā)送這個Packet對象所對應(yīng)的數(shù)據(jù)岸夯。
- 接收ack
發(fā)送一個Packet數(shù)據(jù)包以后麻献,會有一個用來接收ack的ResponseProcessor線程,如果收到成功的ack猜扮,則表示一個Packet發(fā)送成功赎瑰。如果成功,則ResponseProcessor線程會將ackQueue隊列中對應(yīng)的Packet刪除破镰。