Java 接口
本文為 《Hadoop The Definitive Guide 4th Edition》的讀書筆記(或者叫翻譯),僅限交流使用, 轉(zhuǎn)載請注明出處筷狼。
在本節(jié)中瓶籽,我們開看看 Hadoop FileSystem 類。一個 Hadoop 文件系統(tǒng)的API桑逝。雖然我們只是探討一下 HDFS 的實現(xiàn)棘劣,DistributedFileSystem。你也應該在必要時看看他的其他實現(xiàn)楞遏。
從 Hadoop URL 讀取數(shù)據(jù)
讀取數(shù)據(jù)的一個簡單方式茬暇,是直接使用 java.net.URL 對象首昔,用他來打開一個流并讀取數(shù)據(jù)。
InputStream in = null;
try {
in = new URL("hdfs://host/path").openStream();
//process in
} finally {
IOUtils.closeStream(in);
}
完整的代碼如下糙俗,注意看注釋
例子3-1
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
/**
* 使用java.net.URL API讀取HDFS中的數(shù)據(jù)
* Created by henvealf on 16-9-23.
*/
public class URLCat {
/**
* 因為下面的方法在同一個JVM中只能被調(diào)用一次,所以將其放在靜態(tài)的代碼快中就可以。
* 如果在這之前突然有一個在你控制之外的第三方的組件調(diào)用此方法滔吠。
* 你就不能在這樣來獲取Hadoop中的數(shù)據(jù)
*/
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws IOException {
InputStream in = null;
try {
// 這里注意椅贱,如果自己在core-site.xml中指定了端口號女器,在這里就一定要輸入自己指定的端口號
// "hdfs://localhost:9000/user/henvealf/output4/part-r-00000"
in = new URL(args[0]).openStream();
// 使用下面的類的靜態(tài)方法就能夠很方便的將輸入流中的數(shù)據(jù)輸出到指定的輸出流丧诺,這里是屏幕標準輸出
// 第三個參數(shù)是設定流的緩沖區(qū)的大小蜘腌,
// 第四個參數(shù)表示在復制數(shù)據(jù)結束后是否關閉流芯急。
// 我們在下面自己關閉輸入流伺绽,而標準輸出流并不需要關閉
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
//最后不管程序有沒有正常結束,都要將輸入流關閉。
IOUtils.closeStream(in);
}
}
}
簡單的運行:
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
使用FileSystem API 讀取數(shù)據(jù)
前面已經(jīng)說過乔妈,當無法使用 URLStreamHandlerFactory 的時候身隐,就需要使用 FileSystem API 來打開一個文件的輸入流玖绿。
在 Hadoop 文件系統(tǒng)中,一個文件代表使用一個 Path 對象來代表悬包。也就是文件的路勁你可以將他認為是 Hadoop 文件系統(tǒng)的一個 URI棵譬,比如:
hdfs://localhost/user/tom/quangle.txt
FileSystem 是文件系統(tǒng) API 中的一個抽象類,所以第一步就是檢索到你想要使用的文件系統(tǒng)的實例葵腹,這里是HDFS,為了得到他带欢,F(xiàn)ileSystem 提供了一些靜態(tài)工廠方法逗宜。
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String name) throws IOException
一個 Configuration 對象代表一個客戶端或者服務定義的配置诲泌,通過讀取classpath中的配置文件得到辫愉,比如 etc/hadoop/core-site.xml。
- 第一個方法只傳入了一個 Configuration 對象,然后返回一個默認的文件系統(tǒng)(在core-site.xml中定義的)。
- 第二個方法通過給定的 URI 模式和授權決定使用哪一個文件系統(tǒng)处面,如果所指定的文件系統(tǒng)找不到野揪,就使用默認的文件系統(tǒng)挣惰。
- 第三個方法給定一個用戶名,這個在控制上下文的安全中是非常重要的饿幅。
在一些情況下磕秤,你可能想要檢索并獲得一個本地文件系統(tǒng)的實例殖侵。這里有一個很簡單的方法:
public static LocalFileSystem getLocal(Configuration conf) throws IOException
當你得到了一個文件系統(tǒng)的實例贸呢,我們就需要調(diào)用 open()方法打開一個文件的文件流。
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
第一個方法使用的緩沖區(qū)默認大小為4KB拢军。
將他們放在一起贮尉,我們就可以重寫 3-1 了,見3-2:
例子 3-2
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
/**
* 使用FileSystem API讀取HDFS中的數(shù)據(jù)朴沿,
* 如同在URLCat中提到的,有時候調(diào)用不到URLStreamHandlerFactory,這時候就可以使用FileSystem了败砂。
* 在Hadoop文件系統(tǒng)中的文件是使用Path對象來代表的赌渣,就像 hdfs://localhost/user/tom/quangle.txt。
*
* Created by henvealf on 16-9-23.
*/
public class FileSystemCat {
public static void main(String[] args) throws IOException {
String uri = args[0];
// 實例化一個代表配置的對象昌犹,使用默認的配置文件坚芜。
Configuration conf = new Configuration();
// 得到相應文件系統(tǒng)的引用
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = null;
try {
// 真正的打開文件系統(tǒng)中的文件路徑,并得到輸出流斜姥,實際上是一個FSDataInputStream
// 關于FSDataInputStream請看FileSystemDoubleCat
in = fs.open(new Path(uri));
// 下面和URLCat相同
IOUtils.copyBytes(in, System.out, 4069, false);
} finally {
IOUtils.closeStream(in);
}
}
}
使用下面的命令運行程序:
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty
Tree The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
FSDataInputStream
在 FilSystem中的open() 方法事實上返回的是一個 FSDataInputSteam 而不是 java.io 類鸿竖。這個類是一個特殊的能夠支持隨機存取的 java.io.DataInputSteam沧竟,所以你能讀取流中的任何部分。
package org.apache.hadoop.fs
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable {
}
Seekable 接口允許尋找文件中的一個位置指針缚忧,并且提供了一個得到當前距離文件開始的偏移量的方法--getPos()
public interface Seekable {
void seek(long pos) throws IOException;
long getPos() throws IOException;
}
使用一個超出文件長度的指針作為參數(shù)調(diào)用 seek()的話悟泵,會拋出 IOException。他不像 java.io.InputStream 中的 skip() 方法闪水,指針必須指向當前指針位置之后糕非,seek()能夠在文件中任意的移動。
看下面的一個例子球榆。向文件中寫兩次數(shù)據(jù):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.net.URI;
/**
* FSDataInputStream的使用:
* 他是繼承于標準IO的DataInputStream朽肥。
* 實現(xiàn)了Seekable接口,于是乎他就能夠支持隨機存取,
* 兩個方法:
* void seek(long pos) throws IOException;
* long getPos() throws IOException;
* 還實現(xiàn)了PositionedReadable接口持钉,所以你可以使用一個偏移量衡招,這樣你就能用他來讀取到流中的任何部分。
* int read(long position, byte[] buffer, int offset, int length)
* throws IOException;
* void readFully(long position, byte[] buffer, int offset, int length)
* throws IOException;
* public void readFully(long position, byte[] buffer)
* throws IOException;
*
* Created by henvealf on 16-9-23.
*/
public class FileSystemDoubleCat {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri) ,conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 1024, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, 1024, false);
byte[] b3 = new byte[3];
byte[] b33 = new byte[6];
// 當都使用這個參數(shù)時每强,可以看出下面兩個方法沒有區(qū)別
in.read(1,b3,0,3);
in.readFully(1,b3,0,3);
// 當只使用這兩個參數(shù)始腾,就會從平position開始讀數(shù)據(jù),直到填充滿Buffer舀射,
// 當讀的過程中遇到EOF窘茁,就會拋出異常。
in.readFully(1,b33);
System.out.println(new String(b3));
System.out.println(new String(b33));
} finally {
IOUtils.closeStream(in);
}
}
}
read方法的參數(shù)解釋:
- long position : 文件里的位置指針
- byte[] buffer : 存放數(shù)據(jù)的緩沖區(qū)
- int offset: 緩沖區(qū)里的偏移量脆烟,意思是讀取緩沖區(qū)時的起始位置山林。
- int length: 放在buffer中數(shù)據(jù)的長度,并不必須等于緩沖區(qū)的長度。
這些方法在維護當前偏移量的時候是線程安全的邢羔。所以當讀取文件的主體內(nèi)容時驼抹,他也能同時夠很方便的讀取文件的其他部分--比如元數(shù)據(jù)。
最后要注意 seek() 方法的使用拜鹤,如果兩次移動之間的距離過長框冀,會影響到文件讀取的效率。
寫數(shù)據(jù)
package com.henvealf.learn.hadoop.filesystem.datawrite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.net.URI;
/**
* 將本地的文件拷貝到HDFS文件系統(tǒng)中
*
* FSDataOutputStream
* FileSystem方法會返回一個FSDataOutputStream,
* FSDataOutputStream里有一個能都得到當前文件指針的方法敏簿。
* long getPos() throws IOException;
* 他只有這個方法明也,沒有seek方法去任意移動指針。
*
* Directories //目錄
* FileSystem 提供一個創(chuàng)建目錄的方法:
* boolean mkdirs(Path f) throws IOException;
* 這個方法會自動創(chuàng)建不存在的父級目錄惯裕。如果創(chuàng)建目錄成功就返回true温数。
* 不過,你沒必要單獨來創(chuàng)建不存在的目錄蜻势,因為create()方法也會自動將不存在的父級目錄創(chuàng)建好撑刺。
* Created by henvealf on 16-9-23.
*/
public class FileCopyWithProcess {
public static void main(String[] args) throws IOException {
String localSrc = args[0];
String dst = args[1];
// 使用Java類庫獲取本地文件輸入流
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
// 獲取 FileSystem 對象
FileSystem fs = FileSystem.get(URI.create(dst),conf);
// 使用Hadoop的文件系統(tǒng)對象獲取輸入文件流。
// 如果目錄與文件不存在握玛,就自動創(chuàng)建目錄與文件够傍。
// 一個回調(diào)接口甫菠,Progressable,能夠告訴你,你的文件寫入進度冕屯。
OutputStream out = fs.create(new Path(dst),
() -> System.out.print("."));
// 開始寫入
IOUtils.copyBytes(in, out, 4096, true);
// 還有一個
// public FSDataOutputStream append(Path f) throws IOException
// 用于在已有的文件最后追加內(nèi)容.可以使用它來生成無邊界的文件寂诱,例如日志文件。
// 該方法在并沒有在所有的 Hadoop 文件系統(tǒng)中實現(xiàn)愕撰,比如HDFS中有刹衫,而S3中就沒有。
}
}
FSDataOutputStream
FileSystem 的 create() 方法返回一個 FSOutputSteam,和輸入流一樣搞挣,他也有一個能夠查詢當前文件指針所在的位置:
package org.apache.hadoop.fs
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException {
}
}
不過輸入流沒有 seek 方法带迟,輸出流自允許從頭順序?qū)懟蛘邚囊呀?jīng)存在的文件結尾追加。
Directories 目錄
FileSystem 提供一個創(chuàng)建目錄的方法:
- boolean mkdirs(Path f) throws IOException;
這個方法會自動創(chuàng)建不存在的父級目錄囱桨。如果創(chuàng)建目錄成功就返回true仓犬。
不過,你沒必要單獨來創(chuàng)建不存在的目錄舍肠,因為create()方法也會自動將不存在的父級目錄創(chuàng)建好搀继。
查詢文件系統(tǒng)
文件元數(shù)據(jù):FileStatus
package com.henvealf.learn.hadoop.filesystem.dataquery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import java.io.IOException;
import java.io.OutputStream;
/**
* 查詢文件系統(tǒng)中的文件的元數(shù)據(jù),意思是找到某一個文件或者目錄翠语,以及他們的狀態(tài)
* FileStatus中封裝了文件系統(tǒng)中文件與目錄的元數(shù)據(jù)叽躯。
* 如果查找的文件不存在,就會拋出FileNotFoundException異常肌括。
* 為了處理這種情況点骑,可以在使用文件的時候使用boolean exists(Path path) throws IOException 方法。
* Created by henvealf on 16-9-23.
*/
public class ShowFileStatusTest {
public static void main(String[] args) throws IOException {
MiniDFSCluster cluster;
FileSystem fs ;
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
cluster = new MiniDFSCluster.Builder(conf).build();
fs = cluster.getFileSystem();
OutputStream out = fs.create(new Path("dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
System.out.println("------初始化成功------");
System.out.println("-----查看文件狀態(tài)------");
Path file = new Path("dir/file");
FileStatus stat = fs.getFileStatus(file);
printFileStatus(stat);
System.out.println("----查看目錄狀態(tài)-------");
Path dir = new Path("dir");
stat = fs.getFileStatus(dir);
printFileStatus(stat);
if (fs != null) {
fs.close();
}
if(cluster != null) {
cluster.shutdown();
}
System.out.println("-----關閉資源成功-----");
//
}
public static void printFileStatus(FileStatus stat) {
System.out.println("stat.getPath().toUri().getPath() > " + stat.getPath().toUri().getPath());
System.out.println("stat.isDirectory() > " + stat.isDirectory());
System.out.println("stat.getLen() > " + stat.getLen());
System.out.println("stat.getModificationTime() > " + stat.getModificationTime());
System.out.println("stat.getReplication() > " + stat.getReplication());
System.out.println("stat.getBlockSize() > " + stat.getBlockSize());
System.out.println("stat.getOwner() > " + stat.getOwner());
System.out.println("stat.getGroup() > " + stat.getGroup());
System.out.println("stat.getPermission().toString() > " + stat.getPermission().toString());
}
}
不多說谍夭,就是查看文件的原數(shù)據(jù)用黑滴。
文件列表
列出一個目錄中所有文件和目錄的信息〗羲鳎看代碼中
package com.henvealf.learn.hadoop.filesystem.dataquery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* 列出一個目錄中的文件以及文件夾袁辈。
* 使用SystemFile的listStatus方法
* 一共重載了四個方法
* 參數(shù)分別為:
* Path f > 列出該目錄下的所有文件和目錄信息
* Path f, PathFilter filter > 在列出之前使用過濾器篩選出想要的結果。
* Path[] files > 同時列出若干個文件下的文件和目錄信息
* Path[] files, PathFilter filter >
* 他們都返回一個 FileStatus[]
* Created by henvealf on 16-9-24.
*/
public class ListStatus {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
// 根據(jù)路徑和配置獲得相應的文件系統(tǒng)引用
FileSystem fs = FileSystem.get(URI.create(uri), conf);
// 一個Path數(shù)組
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
// 列出目錄下所有文件的狀態(tài)
FileStatus[] status = fs.listStatus(paths);
// 使用工具類將FileStatus[]轉(zhuǎn)換為Path[], 數(shù)組
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
文件模式匹配
我們常常需要對多個文件進行相同的操作珠漂。相比于挨個的將文件的路徑添加到程序中晚缩,我們可以使用通配符來自動將匹配的文件路徑放入我們的代碼中。也就是一團文件(globbin)媳危,F(xiàn)ileSystem 為此提供了兩個方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
可以發(fā)現(xiàn)他返回一個 FileStatus 數(shù)組橡羞。除此之外還有一個 PathFilter 選項來進行進一步的過濾。
Hadoop 支持的路徑通配符和Unix bash shell 相同济舆。
PathFiler 路徑過濾器
使用通配符選擇多個文件有時候并不能滿足我們的需求。這時候我們就可以使用上面提到的 PathFilter 選項了莺债。
他是一個接口類:
package org.apache.hadoop.fs
public interface PathFilter {
boolean accept(Path path);
}
這個類和Java類庫中的FileFilter很相似滋觉。
下面是他的一個實現(xiàn):
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/**
* PathFilter接口
* 使用該接口能夠讓我們使用正則表達式來篩選出想要的Path签夭。
*
* 刪除數(shù)據(jù)
* 使用
* Created by henvealf on 16-9-24.
*
*
*/
public class RegexExcludePathFilter implements PathFilter{
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
// 使用實例
// fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$"))
// 將會得到 /2007/12/13
}
該實現(xiàn)類會進一步過濾掉那些滿足了該正則表達式的路徑。
刪除文件
public boolean delete(Path f, boolean recursive) throws IOException
使用該方法就可以刪除文件或者目錄椎侠。
第二個參數(shù)是控制刪除文件時是或否遞歸刪除其中的所有目錄文件第租。同 rm -r 屬性。