HDFS的java操作
hdfs在生產(chǎn)應(yīng)用中主要是客戶端的開發(fā)移稳,其核心步驟是從hdfs提供的api中構(gòu)造一個(gè)HDFS的訪問(wèn)客戶端對(duì)象,然后通過(guò)該客戶端對(duì)象操作(增刪改查)HDFS上的文件
7.1 搭建開發(fā)環(huán)境
1会油、引入依賴
|
|
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
|
|
注:如需手動(dòng)引入jar包个粱,hdfs的jar包----hadoop的安裝目錄的share下
2、window下開發(fā)的說(shuō)明
建議在linux下進(jìn)行hadoop應(yīng)用的開發(fā)钞啸,不會(huì)存在兼容性問(wèn)題几蜻。如在window上做客戶端應(yīng)用開發(fā),需要設(shè)置以下環(huán)境:
A体斩、在windows的某個(gè)目錄下解壓一個(gè)hadoop的安裝包
B梭稚、將安裝包下的lib和bin目錄用對(duì)應(yīng)windows版本平臺(tái)編譯的本地庫(kù)替換
C、在window系統(tǒng)中配置HADOOP_HOME指向你解壓的安裝包
D絮吵、在windows系統(tǒng)的path變量中加入hadoop的bin目錄
7.2 獲取api中的客戶端對(duì)象
在java中操作hdfs弧烤,首先要獲得一個(gè)客戶端實(shí)例
|
Configuration conf = new Configuration()
FileSystem fs = FileSystem.get(conf)
|
而我們的操作目標(biāo)是HDFS,所以獲取到的fs對(duì)象應(yīng)該是DistributedFileSystem的實(shí)例蹬敲;
get方法是從何處判斷具體實(shí)例化那種客戶端類呢暇昂?
——從conf中的一個(gè)參數(shù) fs.defaultFS的配置值判斷;
如果我們的代碼中沒(méi)有指定fs.defaultFS伴嗡,并且工程classpath下也沒(méi)有給定相應(yīng)的配置急波,conf中的默認(rèn)值就來(lái)自于hadoop的jar包中的core-default.xml,默認(rèn)值為: file:///瘪校,則獲取的將不是一個(gè)DistributedFileSystem的實(shí)例澄暮,而是一個(gè)本地文件系統(tǒng)的客戶端對(duì)象
7.3 DistributedFileSystem實(shí)例****對(duì)象所具備的方法
7.4 HDFS客戶端操作數(shù)據(jù)代碼示例:
hdfs dfsadmin -report查看狀態(tài)
7.4.1 文件的增刪改查
|
public class HdfsClient {
FileSystem fs = null;
@Before
public void init() throws Exception {
// 構(gòu)造一個(gè)配置參數(shù)對(duì)象,設(shè)置一個(gè)參數(shù):我們要訪問(wèn)的hdfs的URI
// 從而FileSystem.get()方法就知道應(yīng)該是去構(gòu)造一個(gè)訪問(wèn)hdfs文件系統(tǒng)的客戶端阱扬,以及hdfs的訪問(wèn)地址
// new Configuration();的時(shí)候泣懊,它就會(huì)去加載jar包中的hdfs-default.xml
// 然后再加載classpath下的hdfs-site.xml
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
/**
- 參數(shù)優(yōu)先級(jí): 1、客戶端代碼中設(shè)置的值 2麻惶、classpath下的用戶自定義配置文件 3馍刮、然后是服務(wù)器的默認(rèn)配置
*/
conf.set("dfs.replication", "3");
// 獲取一個(gè)hdfs的訪問(wèn)客戶端,根據(jù)參數(shù)窃蹋,這個(gè)實(shí)例應(yīng)該是DistributedFileSystem的實(shí)例
// fs = FileSystem.get(conf);
// 如果這樣去獲取卡啰,那conf里面就可以不要配"fs.defaultFS"參數(shù)静稻,而且,這個(gè)客戶端的身份標(biāo)識(shí)已經(jīng)是hadoop用戶
fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
/**
往hdfs上傳文件
@throws Exception
*/
@Test
public void testAddFileToHdfs() throws Exception {
// 要上傳的文件所在的本地路徑
Path src = new Path("g:/redis-recommend.zip");
// 要上傳到hdfs的目標(biāo)路徑
Path dst = new Path("/aaa");
fs.copyFromLocalFile(src, dst);
fs.close();
}
/**
從hdfs中復(fù)制文件到本地文件系統(tǒng)
@throws IOException
@throws IllegalArgumentException
*/
@Test
public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
fs.close();
}
@Test
public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 創(chuàng)建目錄
fs.mkdirs(new Path("/a1/b1/c1"));
// 刪除文件夾 碎乃,如果是非空文件夾姊扔,參數(shù)2必須給值true
fs.delete(new Path("/aaa"), true);
// 重命名文件或文件夾
fs.rename(new Path("/a1"), new Path("/a2"));
}
/**
查看目錄信息惠奸,只顯示文件
@throws IOException
@throws IllegalArgumentException
@throws FileNotFoundException
*/
@Test
public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:為什么返回迭代器梅誓,而不是List之類的容器
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println(fileStatus.getPath().getName());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getLen());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("--------------為angelababy打印的分割線--------------");
}
}
/**
查看文件及文件夾信息
@throws IOException
@throws IllegalArgumentException
@throws FileNotFoundException
*/
@Test
public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- ";
for (FileStatus fstatus : listStatus) {
if (fstatus.isFile()) flag = "f-- ";
System.out.println(flag + fstatus.getPath().getName());
}
}
}
|
7.4.2 通過(guò)流的方式訪問(wèn)hdfs
|
/**
- 相對(duì)那些封裝好的方法而言的更底層一些的操作方式*
- 上層那些mapreduce spark等運(yùn)算框架,去hdfs中獲取數(shù)據(jù)的時(shí)候佛南,就是調(diào)的這種底層的api*
- @author*
- /
public class StreamAccess {
FileSystem fs = null;
@Before
public void init() throws Exception {
Configuration conf = new Configuration();
fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
}
@Test
public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
//先獲取一個(gè)文件的輸入流----針對(duì)hdfs上的
FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
//再構(gòu)造一個(gè)文件的輸出流----針對(duì)本地的
FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
//再將輸入流中數(shù)據(jù)傳輸?shù)捷敵隽?/em>
IOUtils.copyBytes(in, out, 4096);
}
/**
- hdfs支持隨機(jī)定位進(jìn)行文件讀取梗掰,而且可以方便地讀取指定長(zhǎng)度*
- 用于上層分布式運(yùn)算框架并發(fā)處理數(shù)據(jù)*
- @throws IllegalArgumentException*
- @throws IOException*
/
@Test
public void testRandomAccess() throws IllegalArgumentException, IOException{
//先獲取一個(gè)文件的輸入流----針對(duì)hdfs上的
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//可以將流的起始偏移量進(jìn)行自定義
in.seek(22);
//再構(gòu)造一個(gè)文件的輸出流----針對(duì)本地的
FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,19L,true);
}
/**
- 顯示hdfs上文件的內(nèi)容*
- @throws IOException*
- @throws IllegalArgumentException*
/
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, 1024);
}
}
|
7.4.3 場(chǎng)景編程
在mapreduce 、spark等運(yùn)算框架中嗅回,有一個(gè)核心思想就是將運(yùn)算移往數(shù)據(jù)及穗,或者說(shuō),就是要在并發(fā)計(jì)算中盡可能讓運(yùn)算本地化绵载,這就需要獲取數(shù)據(jù)所在位置的信息并進(jìn)行相應(yīng)范圍讀取
以下模擬實(shí)現(xiàn):獲取一個(gè)文件的所有block位置信息埂陆,然后讀取指定block中的內(nèi)容
|
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
//拿到文件信息
FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
//獲取這個(gè)文件的所有block的信息
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
//第一個(gè)block的長(zhǎng)度
long length = fileBlockLocations[0].getLength();
//第一個(gè)block的起始偏移量
long offset = fileBlockLocations[0].getOffset();
System.out.println(length);
System.out.println(offset);
//獲取第一個(gè)block寫入輸出流
//IOUtils.copyBytes(in, System.out, (int)length);
byte[] b = new byte[4096];
FileOutputStream os = new FileOutputStream(new File("d:/block0"));
while(in.read(offset, b, 0, 4096)!=-1){
os.write(b);
offset += 4096;
if(offset>=length) return;
};
os.flush();
os.close();
in.close();
}
|