背景
從事大數(shù)據(jù)以來,主要是spark的開發(fā)谚中,時不時總是會用到一些hdfs的上接口;半途開始大數(shù)據(jù),對hdfs不是很熟悉。每次用都是百度。所以這里做個總結(jié)迫肖,為了以后能更好的用好hdfs。
先說下幾個重要的類
FileSystem
FileSystem這個類非常重要劲室,相當于一個連接喉磁,類似數(shù)據(jù)庫中一個連接孕暇;連接還是比較費資源的座舍,因此可以弄一個連接池,后面復用這些連接就可以了缩焦。
一般獲取的代碼都是固定不變的。如下:
public static FileSystem getDFS() throws IOException {
conf = new Configuration();
//設(shè)置安全驗證方式為kerberos
conf.set("hadoop.security.authentication", "kerberos");
conf.set("fs.defaultFS", "hdfs://...");
return FileSystem.get(conf);
}
- 首先需要new一個Configuration塑荒,如果configuration中沒有設(shè)置相關(guān)的參數(shù),那么會取集群的配置文件,獲取里面的配置信息把篓。(一般我們也建議疗锐,不設(shè)置;因為主節(jié)點可能會變)
- 然后直接調(diào)用FileSystem.get()方法就可以得到一個FileSystem對象了关划。
FileSystem中有很多方法小染,跟File中的方法非常類似裤翩,如exists择份、delete利诺、mkdir娇斑、create等等一些常用的文件操作方法摩窃。
這里主要看讀和寫方法,讀是open方法
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file to open
*/
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
這個方法有一個讀緩存,默認是4096猾愿,如果想要設(shè)置這個值鹦聪,可以使用DistributedFileSystem類中open方法
open方法就是得到一個輸入流,這里再次強調(diào)java中的io相當重要啊蒂秘,要是理解了java中的io這里的操作看下api就會了泽本。
寫的話,只有append方法姻僧,而且一般是不推薦不適用該方法的规丽,這個代價會比較大。hdfs文件系統(tǒng)也是不支持修改操作的撇贺。append方法見名知意赌莺,就是在文件后面進行追加。(因為文件是分塊存放的松嘶,而且還有幾個副本艘狭,修改的代價會非常大)
create方法也要說一下,這個創(chuàng)建文件也分為兩種翠订,覆蓋和不覆蓋巢音;create有很多重載的方法,選擇一個自己用的就可以了尽超。
還有幾種比較重要的方法官撼;后面會提到,什么globStatus之類的
FileStatus
字面意思是文件的狀態(tài)似谁,其實我更傾向于理解為文件的屬性傲绣,F(xiàn)ileStatus中有一系列的方法,可以得到文件的信息棘脐。
像一些getLen()得到文件的長度斜筐,以字節(jié)的形式。isFile蛀缝、isDirectory顷链、getReplication一些見名知意的方法,就不多說了屈梁。
setOwner嗤练、setGroup、setPermission這些修改的方法在讶,在外面沒法使用煞抬,就不多說了。
之所有要先說這個FileStatus构哺,主要是為了好介紹下面非常有用的方法革答。
globStatus
這個不是一個類战坤,是一個方法,但是很重要残拐。
我們經(jīng)常需要處理一批文件途茫,有通配符來匹配是非常方便的。FileSystem提供了兩個通配符的方法
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
globStatus()方法返回于其路徑匹配的所有文件的FileStatus對象數(shù)組溪食,并按路徑排序囊卜。PathFilter可以作為選項進一步對匹配結(jié)果進行限制。對于通配符的支持可以看另一篇文章hadoop支持的通配符
IOUtils
IOUtils這樣類错沃,在文件系統(tǒng)中一般都會有栅组,這里主要是介紹hadoop中的。
下面來看個例子枢析,將本地文件復制到Hadoop文件系統(tǒng)
public static void main(String[] args) throws IOException {
File file = new File("E:\\文件\\學習/apache_hbase_reference_guide.pdf");
InputStream in = new BufferedInputStream(new FileInputStream(file));
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://...:9000");
FileSystem fileSystem = FileSystem.get(conf);
float fileSize = file.length();
FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("hdfs://.../world.pdf"), new Progressable() {
int count = 1;
@Override
public void progress() {
System.out.println((count * 1024 * 64)/fileSize);
count ++;
}
});
IOUtils.copyBytes(in, fsDataOutputStream, 4096, true);
}
Progressable類主要是展示進度的玉掸,重寫的 progress 方法在每次上傳了64KB(不是絕對的64KB,會有一定的偏差)字節(jié)大小的文件之后會自動調(diào)用一次登疗;因此我們給出一個大概的上傳進度排截。
我們這里主要是IkanIOUtils.copyBytes方法,這個方法有很多重載的辐益。
先看剛剛使用的
/**
* Copies from one stream to another.
*
* @param in InputStrem to read from
* @param out OutputStream to write to
* @param buffSize the size of the buffer
* @param close whether or not close the InputStream and
* OutputStream at the end. The streams are closed in the finally clause.
*/
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException
復制整個流,可以設(shè)置緩沖的大小脱吱。
注釋寫的非常清楚智政,一般也是建議使用這個方法,特別是boolean close這個參數(shù)箱蝠,最好傳遞true续捂,免得流沒有關(guān)閉,導致其他的一些問題宦搬。有一個方法和這個非常類似
/**
* Copies count bytes from one stream to another.
*
* @param in InputStream to read from
* @param out OutputStream to write to
* @param count number of bytes to copy
* @param close whether to close the streams
* @throws IOException if bytes can not be read or written
*/
public static void copyBytes(InputStream in, OutputStream out, long count, boolean close) throws IOException
這個方法是復制long count的byte的數(shù)據(jù)牙瓢。這個用的非常少,但是很容易和上面弄混间校。
FileUtil
和其他文件系統(tǒng)一樣矾克,hadoop中也有FileUtil這個工具類。先來看看這個stat2Paths憔足,這個方法會將一個數(shù)組的status轉(zhuǎn)化為數(shù)組的path對象胁附。
/**
* convert an array of FileStatus to an array of Path
*
* @param stats
* an array of FileStatus objects
* @return an array of paths corresponding to the input
*/
public static Path[] stat2Paths(FileStatus[] stats) {
if (stats == null)
return null;
Path[] ret = new Path[stats.length];
for (int i = 0; i < stats.length; ++i) {
ret[i] = stats[i].getPath();
}
return ret;
}
很多方法,看下api基本上就會用了滓彰,下面主要介紹一下控妻,上傳和下載;首先來看看上傳揭绑,也就是將文件本地拷貝到hadoop文件系統(tǒng)弓候,上面用IOUtils實現(xiàn)了一遍,下面用FileUtil也來實現(xiàn)一遍
private static void copyFileByFile() throws IOException {
File file = new File("E:\\文件\\學習/apache_hbase_reference_guide.pdf");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs:/...:9000");
FileSystem fileSystem = FileSystem.get(conf);
FileUtil.copy(file, fileSystem, new Path("/dxy/hello.pdf"), false, conf);
}
這個使用比IOUtils來的要方便點,但是沒有進度的展示菇存。根據(jù)實際需求使用吧
從hadoop中下載和上面非常類似夸研,就是hadoop文件系統(tǒng)拷貝到本地
private static void copyFileToLocal() throws IOException {
File file = new File("E:\\文件\\學習/hello.pdf");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://...:9000");
FileSystem fileSystem = FileSystem.get(conf);
FileUtil.copy(fileSystem, new Path("/dxy/hello.pdf"), file, false, conf);
}
還有hadoop上復制文件,和上面的也非常類似撰筷,只是目標的參數(shù)變了陈惰。
下面要介紹一個比較重要的方法,合并文件毕籽,因為hadoop中對小文件的處理是非常不擅長的抬闯,因此我們可能需要對小文件進行合并。FileUtil中提供了一個方法copyMerge方法关筒,
/** Copy all files in a directory to one output file (merge). */
public static boolean copyMerge(FileSystem srcFS, Path srcDir,
FileSystem dstFS, Path dstFile,
boolean deleteSource,
Configuration conf, String addString) throws IOException {
// 簡直合并之后的文件名是否滿足要求
dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
// 如果要合并的目錄溶握,下面還是目錄,則返回false
if (!srcFS.getFileStatus(srcDir).isDirectory())
return false;
// 創(chuàng)建目標文件
OutputStream out = dstFS.create(dstFile);
try {
FileStatus contents[] = srcFS.listStatus(srcDir);
Arrays.sort(contents); // 按照文件名進行排序
for (int i = 0; i < contents.length; i++) {
if (contents[i].isFile()) {
InputStream in = srcFS.open(contents[i].getPath());
try {
// 將要合并的文件復制到目標文件中蒸播,這里conf傳遞過去睡榆,就是為了得到io.file.buffer.size參數(shù),作為寫緩存的大小
IOUtils.copyBytes(in, out, conf, false);
if (addString!=null)
// 每個合并文件完了之后袍榆,添加一個addString
out.write(addString.getBytes("UTF-8"));
} finally {
in.close();
}
}
}
} finally {
out.close();
}
if (deleteSource) {
return srcFS.delete(srcDir, true);
} else {
return true;
}
}
代碼還是比較簡單的胀屿。如果只是簡單的合并,這個方法完全夠用了包雀。如果有個需求宿崭,合并之后,仍然可以找到之前文件名對應的文件內(nèi)容才写;當然我們也可以改寫這個方法葡兑,將addString改為文件的名稱,只需將添加addString代碼去掉赞草,添加上下面這行就行了讹堤。
out.write(contents[i].getPath().getName().getBytes("UTF-8"));
當然為了更快的讀到想要的內(nèi)容,我也可以弄一個類似的目錄的東西厨疙,快速定位到文件內(nèi)容在哪洲守。
hadoop的api還是很容易上手的,多用多總結(jié)轰异。