1.HDFS 整體框架圖
HDFS源碼閱讀之整體框架圖.png
2.HDFS Client 讀寫的 Java 示例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
public class HDFSDemo {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000"); // 設(shè)置HDFS的地址
try {
// 創(chuàng)建HDFS文件系統(tǒng)對(duì)象
FileSystem fs = FileSystem.get(conf);
// 寫入文件
Path filePath = new Path("/path/to/file.txt");
FSDataOutputStream outputStream = fs.create(filePath);
String content = "Hello, HDFS!";
outputStream.writeBytes(content);
outputStream.close();
// 讀取文件
FSDataInputStream inputStream = fs.open(filePath);
byte[] buffer = new byte[1024];
int bytesRead = inputStream.read(buffer);
String fileContent = new String(buffer, 0, bytesRead);
System.out.println("File content: " + fileContent);
inputStream.close();
// 刪除文件
fs.delete(filePath, false);
// 關(guān)閉HDFS文件系統(tǒng)對(duì)象
fs.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.HDFS Client 源碼分析
3.1 Client 初始化
hdfs client初始化的時(shí)序圖.JPG
FileSystem
的關(guān)鍵代碼:
private FileSystem getInternal(URI uri, Configuration conf, Key key)
throws IOException{
FileSystem fs;
synchronized (this) {
// Cache.map 獲取 fs
fs = map.get(key);
}
if (fs != null) {
return fs;
}
// 創(chuàng)建性 fs
fs = createFileSystem(uri, conf);
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
}
// 省略...
// 保存到 Cache.map
fs.key = key;
map.put(key, fs);
// 省略...
}
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
scope.addKVAnnotation("scheme", uri.getScheme());
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
// 反射的方式創(chuàng)建 DistributedFileSystem
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
// DistributedFileSystem.initialize
fs.initialize(uri, conf);
return fs;
}
}
DFSClient
的關(guān)鍵代碼:
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats) throws IOException {
// 省略...
ProxyAndInfo<ClientProtocol> proxyInfo = null;
// 省略...
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
// 利用 NameNodeProxiesClient 創(chuàng)建 ProxyAndInfo
proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(conf,
nameNodeUri, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
// 利用 ClientProtocol 類型的 namenode
this.namenode = proxyInfo.getProxy();
}
// 省略...
// sasl client 初始化
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
}
3.2 讀數(shù)據(jù)
hdfs client讀取數(shù)據(jù)的時(shí)序圖.JPG
- FileSystem 初始化的過(guò)程:Client 拿到
NameNodeRpcServer
代理對(duì)象破加,建立與 NameNode 的 RPC 通信 - 調(diào)用 FileSystem 的 open() 方法,由于實(shí)現(xiàn)類為
DistributedFileSystem
所有是調(diào)用該類中的 open() 方法 - DistributedFileSystem 持有
DFSClient
的引用雹嗦,繼續(xù)調(diào)用DFSClient. open()
方法- 1.實(shí)例化
DFSInputStream
輸入流即 HdfsDataInputStream - 2.調(diào)用
DFSInputStream.openinfo()
方法范舀,接著調(diào)用DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength()
方法抓取 block 信息并獲取最后 block 長(zhǎng)度,再調(diào)用DFSClient.getLocatedBlocks()
獲取block信息了罪,最后DFSClient.callGetBlockLocations()
方法中通過(guò) NameNode 代理對(duì)象調(diào)用NameNodeRpcServer.getBlockLocations()
方法
- 1.實(shí)例化
- 獲取到的 block 信息保存到 DFSInputStream 輸入流對(duì)象中的成員變量
locatedBlocks
- 交給
HdfsDataInputStream
的基類FSDataInputStream.read
即FSInputStream.read
讀取指定 hdfs 文件的內(nèi)容
DistributedFileSystem
的關(guān)鍵代碼:
@Override
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p) throws IOException {
final DFSInputStream dfsis =
// DFSClient. open()
dfs.open(getPathName(p), bufferSize, verifyChecksum);
// 最終返回 HdfsDataInputStream
return dfs.createWrappedInputStream(dfsis);
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
DFSInputStream
的關(guān)鍵代碼:
// 從 DFSClient 獲取 block 信息并保存在成員變量 locatedBlocks
void openInfo(boolean refreshLocatedBlocks) throws IOException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
// 調(diào)用 DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength() 方法抓取 block 信息并獲取最后 block 長(zhǎng)度尿背,內(nèi)部調(diào)用 DFSClient.getLocatedBlocks() 獲取block 信息并保存到成員變量中
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
retriesForLastBlockLength--;
}
if (lastBlockBeingWrittenLength == -1
&& retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}
FSInputStream
的關(guān)鍵代碼:
@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
ReaderStrategy byteBufferReader =
new ByteBufferStrategy(buf, readStatistics, dfsClient);
return readWithStrategy(byteBufferReader);
}
【參考】
1.Hadoop3.1.1架構(gòu)體系——設(shè)計(jì)原理闡述與Client源碼圖文詳解
2.Hadoop3.1.1源碼Client詳解 : 寫入準(zhǔn)備-RPC調(diào)用與流的建立
3.七、HDFS上傳和下載原理(有源碼解析)
4.Hadoop源碼分析-HDFS寫數(shù)據(jù)源碼分析