我們先來看看Hadoop源碼中對DFS Client的注釋說明:
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
* to communicate with a NameNode daemon, and connects
* directly to DataNodes to read/write block data.
*
* Hadoop DFS users should obtain an instance of
* DistributedFileSystem, which uses DFSClient to handle
* filesystem tasks.
********************************************************/
顯然泄隔,DFSClient在DistributedFileSystem和NameNode和之間起到了橋梁的作用拒贱。hdfs用戶需要獲取一個DistributedFileSystem實例對象來處理文件系統(tǒng)的一些任務(wù)。而DistributedFileSystem實例對象又需要使用DFSClient來完成用戶提出的任務(wù)需求佛嬉。而DFSClient則又通過ClientProtocol來連接Namenode逻澳,以及直接連接Datanode來讀取block數(shù)據(jù)。這就是它們?nèi)叩膮f(xié)作關(guān)系暖呕。
接下來斜做,我們來看看它們具體的實現(xiàn)過程。接上一篇湾揽,ls方法里的第二個步驟瓤逼,需要根據(jù)Path對象來獲取FileSystem對象:
FileSystem srcFs = srcPath.getFileSystem(this.getConf());
srcPath變量是Path對象笼吟,接著我們來看看Path對象里的getFileSystem這個方法:
public FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(this.toUri(), conf);
}
馬上,又獲取到Path對象的uri格式霸旗,接著調(diào)用FileSystem對象的get方法贷帮。
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
在FileSystem對象的get方法中會先解析uri的scheme和authority。scheme確定文件系統(tǒng)的類型诱告,authority確定訪問地址(ip)和端口(port)撵枢。如果scheme和authority都不為null,則正常獲取FileSystem對象蔬啡;如果scheme不為null诲侮,而authority為null,且scheme和默認的匹配箱蟆,則根據(jù)默認的scheme和authority來創(chuàng)建FileSystem對象沟绪;若scheme和authority同為null,則也根據(jù)默認的scheme和authority來創(chuàng)建FileSystem對象空猜。
在我們之前的例子里“-ls /test”绽慈,是scheme和authority同為null的情況,所以需要默認的scheme和authority來創(chuàng)建FileSystem對象辈毯。那么坝疼,我們來看看默認獲取scheme和authority的過程。
跳轉(zhuǎn)到FileSystem對象的get(Conf)方法:
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
public static URI getDefaultUri(Configuration conf) {
return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, "file:///")));
}
其中谆沃,F(xiàn)S_DEFAULT_NAME_KEY="fs.default.name"钝凶。conf.get(FS_DEFAULT_NAME_KEY, "file:///")從配置文件里獲取屬性fs.default.name的值(這和我們對配置文件的配置有關(guān),我們配置的是"hdfs://localhost")唁影,如果獲取為空耕陷,則設(shè)為"file:///"。所以我們可以預見据沈,默認的uri的scheme是hdfs哟沫,默認的uri的authority是localhost。獲取到默認的uri后锌介,然后再返回上面的get方法重新解析uri嗜诀。解析獲取scheme和authority都不為空,于是繼續(xù)執(zhí)行孔祸。
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
繼續(xù)執(zhí)行過程有一步是說隆敢,看看配置中有沒有配置fs.hdfs.impl.disable.cache這個參數(shù),如果設(shè)置里這個參數(shù)為true崔慧,則不在緩存中創(chuàng)建文件系統(tǒng)筑公。如果設(shè)置了false或者沒有設(shè)置,則在緩存中創(chuàng)建文件系統(tǒng)尊浪。
因為匣屡,我們是沒有設(shè)置這個參數(shù)的,所以接下來會在緩存中創(chuàng)建文件系統(tǒng)拇涤。緩存的實現(xiàn)具體實現(xiàn)我先不看捣作,但是個值得關(guān)注的點。需要注意一點的是鹅士,Cache是FileSystem的內(nèi)部靜態(tài)類券躁。
我們繼續(xù)看到Cache里的get方法:
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
FileSystem fs = null;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}
需要指出的是,這里的map是Cache類的一個成員變量掉盅,map的key是封裝了uri和conf的一個Key對象(是Cache類的一個內(nèi)部靜態(tài)類)也拜,value是FileSystem類型。
首先趾痘,get方法將uri和conf封裝成一個key值慢哈,然后企圖從map中根據(jù)key值(以多線程并發(fā))來獲取相應(yīng)的FileSystem。如果獲取到了永票,則直接返回從map中獲取的FileSystem卵贱,否則創(chuàng)建新的FileSystem:
fs = createFileSystem(uri, conf); // 創(chuàng)建文件系統(tǒng)
private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
LOG.debug("Creating filesystem for " + uri);
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
我們可以看到,在FileSystem類的createFileSystem方法中侣集,先用scheme在conf中找到對應(yīng)的FileSystem的類類型(例如键俱,scheme:hdfs對應(yīng)了DistributedFileSystem),然后利用Java反射機制來實例化該類類型世分,即創(chuàng)建一個具體的FileSystem對象编振。
接下來,還有非常重要的一步臭埋,那就是DistributedFileSystem.initialize(uri, conf)踪央。
這里,要提兩點斋泄。第二點非常重要杯瞻!
一、我自己目前看到的炫掐,hadoop的源碼中很多對類的構(gòu)造就分兩步魁莉,先是構(gòu)造函數(shù)初始化一些內(nèi)部變量(其實,DistributedFileSystem類的構(gòu)造函數(shù)已經(jīng)不做任何事了)募胃,然后再用initialize方法初始化一些變量旗唁。這肯定有它的設(shè)計目的在,暫不深究痹束,先知道有這么個意思就好检疫。
二、這里的initialize方法是DistributedFileSystem類的方法祷嘶,不是FileSystem類的屎媳。之前提到過夺溢,F(xiàn)ileSystem是一個抽象類,DistributedFileSystem繼承了它烛谊。所以风响,這里fs是父抽象類對子類的一個引用。如果子類沒有重寫父類的方法丹禀,則調(diào)用父類方法状勤;如果子類重寫了父類的方法,則默認調(diào)用子類重寫的方法双泪。另外加一句持搜,如果是子類新添加的方法,那么這種方法對該引用是不可見的焙矛。
而這里的initialize方法就屬于子類重寫了父方法的那種葫盼。我一開始沒發(fā)現(xiàn)(java基礎(chǔ)薄弱),然后在FileSystem類里的initialize方法里打轉(zhuǎn)轉(zhuǎn)薄扁。
好剪返,言歸正傳,繼續(xù)看DistributedFileSystem類的initialize方法做了些什么邓梅。
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
this.dfs = new DFSClient(namenode, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}
首先脱盲,它調(diào)用了super.initialize(uri, conf)方法,其實就是FileSystem.initialize(uri, conf)日缨,也就是我掉過的坑钱反,現(xiàn)在知道為什么掉入了那個坑就出不來了吧。
然后匣距,設(shè)置了配置setConf(conf)面哥。
接著,創(chuàng)建了一個InetSocketAddress對象namenode(一個由hostname毅待、IP尚卫、port構(gòu)成的對象,并實現(xiàn)了序列化)尸红,用來socket通信吱涉,還沒深入了解。
以及外里,初始化uri怎爵、workingDir,以及我們今天的另一個主角DFSClient盅蝗。
我們接下來就來看看DFSClient創(chuàng)建的過程(這是DistributedFileSystem類的initialize過程的一部分鳖链,我們不要忘了我們的目的,以及我們開篇提到過的墩莫,DistributedFileSystem需要使用DFSCclient類來完成在文件系統(tǒng)上的任務(wù)芙委。)
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
}
提一句逞敷,F(xiàn)ileSystem.Statistics對象記錄了整個文件系統(tǒng)的一些信息,包括讀/寫字節(jié)數(shù)题山、讀/寫次數(shù)等等兰粉。
/**
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
* 創(chuàng)建一個DFSClient用來連接到給定的nameNodeAddr或者rpcNameNode, 兩者必有一個為null(為什么呢?)
*/
DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
this.conf = conf;
this.stats = stats;
this.nnAddress = nameNodeAddr;
this.socketTimeout = conf.getInt("dfs.socket.timeout",
HdfsConstants.READ_TIMEOUT);
this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
HdfsConstants.WRITE_TIMEOUT);
this.timeoutValue = this.socketTimeout;
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
// dfs.write.packet.size is an internal config variable
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
this.hdfsTimeout = Client.getTimeout(conf);
ugi = UserGroupInformation.getCurrentUser();
this.authority = nameNodeAddr == null? "null":
nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
String taskId = conf.get("mapred.task.id", "NONMAPREDUCE");
this.clientName = "DFSClient_" + taskId + "_" +
r.nextInt() + "_" + Thread.currentThread().getId();
defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
defaultReplication = (short) conf.getInt("dfs.replication", 3);```
DFSClient初始化了很多變量顶瞳,有配置信息conf、文件系統(tǒng)的統(tǒng)計信息stats愕秫、Namenode的socket地址慨菱、socket連接超時上限、datanode寫超時上限戴甩、socket工廠符喝、寫數(shù)據(jù)包的大小、block獲取重復次數(shù)甜孤、默認塊大小协饲、默認塊備份個數(shù)等等。
**這里我先留一個問題缴川,hadoop源碼的DFSClient類的注釋里說明:創(chuàng)建一個DFSClient用來連接到給定的nameNodeAddr或者rpcNameNode, 兩者必有一個為null茉稠。為什么兩者必有一個是null?這個我不明白把夸。這應(yīng)該和hadoop ipc機制有關(guān)而线。也是我接下來需要關(guān)注的重點。**
接下來恋日,DFSClient的創(chuàng)建來到一個**非常重要的步驟**:
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
this.namenode = createNamenode(this.rpcNamenode, conf);
} else if (nameNodeAddr == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = this.rpcNamenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
}
在nameNodeAddr不為null膀篮,而rpcNamenode為null的情況下(例如,我在調(diào)試代碼事岂膳,nameNodeAddr=localhost/127.0.0.1:8020誓竿,rpcNamenode為null),會創(chuàng)建RPCNamenode對象(createRPCNamenode)和Namenode對象(createNamenode)谈截,用來建立到Namenode節(jié)點的IPC連接筷屡。這里就涉及hadoop ipc機制的核心,我現(xiàn)在還不理解傻盟,需要等學習之后再來講速蕊。
// read directly from the block file if configured.
this.shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
}
this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
}
String localInterfaces[] =
conf.getStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
if (null == localInterfaces) {
localInterfaces = new String[0];
}
this.localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
LOG.debug("Using local interfaces [" +
StringUtils.join(",",localInterfaces)+ "] with addresses [" +
StringUtils.join(",",localInterfaceAddrs) + "]");
}
接著,DFSClient又設(shè)置了一些配置參數(shù)娘赴。好吧规哲,這些參數(shù)意思也不是很明確。shortCircuitLocalReads的意思是說是否支持本地讀确瘫怼唉锌;connectToDnViaHostname是說是否支持用hostname來連接DataNode隅肥。
至此,DFSClient就創(chuàng)建好了(其實我們還沒講它的核心T^T)袄简。那么腥放,DistributedFileSystem的initialize過程也Ok了。接下來要回到哪兒了呢绿语?
就要返回到FsShell里ls第二個步驟(獲取FileSystem)完了之后的那個步驟(在FileSystem上獲取文件信息)秃症。
第三個步驟,我們簡略的講吕粹,但我們需要明確一點种柑,看下面。
獲取文件信息這個過程會從FsShell類通過方法調(diào)用到FileSystem類匹耕,再到DistributedFileSystem類聚请,再到DFSClient類里的getFileInfo方法。這個getFileInfo方法又調(diào)用了namenode.getFileInfo方法稳其。**注意驶赏,namenode是ClientProtocol類型的,是一個接口既鞠,沒有任何方法的實現(xiàn)**
// DFSClient類里的getFileInfo
public HdfsFileStatus getFileInfo(String src) throws IOException {
checkOpen(); // 檢查路徑是否為空或null
try {
System.out.println("getFileInfo's src " + src);
return namenode.getFileInfo(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
}
}
那么煤傍,namenode的getFileInfo方法又怎能獲取到文件信息呢?
我們想到一點损趋,這個namenode就是之前我們創(chuàng)建DFSClient是初始化的namenode變量患久,它建立了DFSClient和Namenode節(jié)點之間的連接。那么浑槽,會不會因為通信的關(guān)系蒋失,其實是調(diào)用了Namenode里的getFileInfo方法呢?(其實這樣想還有另外一個原因桐玻,那就是這個變量的名字)
然后我們看到篙挽,在Namenode類里,果然有g(shù)etFileInfo方法镊靴,而且有具體的實現(xiàn):
public HdfsFileStatus getFileInfo(String src) throws IOException {
System.out.println("call this function NameNode->getFileInfo()");
myMetrics.incrNumFileInfoOps();
return namesystem.getFileInfo(src);
}
而且铣卡,我們通過跟蹤調(diào)用,確實證明了調(diào)用Namenode的getFileInfo這個方法偏竟。我們通過這個方法繼續(xù)跟蹤下去煮落,就能看到獲取文件信息的具體操作:
/**
- Create FileStatus by file INode
*/
private static HdfsFileStatus createFileStatus(byte[] path, INode node) {
// length is zero for directories
return new HdfsFileStatus(
node.isDirectory() ? 0 : ((INodeFile)node).computeContentSummary().getLength(),
node.isDirectory(),
node.isDirectory() ? 0 : ((INodeFile)node).getReplication(),
node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
node.getModificationTime(),
node.getAccessTime(),
node.getFsPermission(),
node.getUserName(),
node.getGroupName(),
path);
}
獲取的文件信息最終以FileStatus對象的形式返回給FsShell里的ls方法,ls再從FileStatus對象中獲取具體的文件信息踊谋,將其打印出來:
上面所說蝉仇,我們需要明確的一點就是,**DFSClient會把方法的調(diào)用發(fā)給Namenode節(jié)點來執(zhí)行,且執(zhí)行的是Namenode自己的方法轿衔。**提一點沉迹,我們發(fā)現(xiàn)這里的方法接口是一樣的,因為Namnode也實現(xiàn)了ClientProtocol接口害驹。
###總結(jié)一下###
* DFSClient在DistributedFileSystem和NameNode和之間起到了橋梁的作用
* 創(chuàng)建DistributedFileSystem類用到了Java反射機制
* DistributedFileSystem.initialize方法初始化了uri鞭呕、workingDir變量,以及非常重要的創(chuàng)建DFSClient對象
* 創(chuàng)建DFSClient對象除了初始化一些變量外宛官,還建立了和Namenode節(jié)點的連接