使用hadoop客戶端api訪問(wèn)hdfs
1.創(chuàng)建java項(xiàng)目
2.導(dǎo)入hadoop類(lèi)庫(kù)
注意
我用的是idea創(chuàng)建的java項(xiàng)目,用maven來(lái)管理jar包
@Test
public void readFile() throws Exception {
System.out.println("HELLO");
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
URL url = new URL("hdfs://20.18.5.1:8020/user/hadoop/hadoop/test.txt");
URLConnection conn = url.openConnection();
InputStream is = conn.getInputStream();
byte[] buf = new byte[is.available()];
is.read(buf);
is.close();
String str = new String(buf);
System.out.println(str);
}
在跑test的時(shí)候,因?yàn)閔adoop的依賴很多,想的是在用的時(shí)候再一次導(dǎo)入,因?yàn)樵瓉?lái)沒(méi)有hdfs協(xié)議,所以要跑url這一行的時(shí)候是過(guò)不去的,需要寫(xiě)URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
這行代碼才能過(guò)去芽隆。FsUrlStreamHandlerFactory是在hadoop-common.jar里面的,所以我只導(dǎo)入了hadoop-common
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
但是導(dǎo)入以后依然回報(bào)下面的錯(cuò)誤
java.net.MalformedURLException: unknown protocol: hdfs
at java.net.URL.<init>(URL.java:593)
at java.net.URL.<init>(URL.java:483)
at java.net.URL.<init>(URL.java:432)
at TestHDFS.readFile(TestHDFS.java:17)
于是一次導(dǎo)入相關(guān)的依賴,發(fā)現(xiàn)在導(dǎo)入下面依賴以后就可以正常跑起來(lái)了,下面的依賴包含了上面common的依賴胚吁,所以上面的common可以刪掉
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
服務(wù)器臼闻,機(jī)架,機(jī)房囤采,大數(shù)據(jù)中心
API
blade 刀片服務(wù)器
rack 機(jī)架(一臺(tái)機(jī)架上有6個(gè)刀片服務(wù)器述呐,并且每一個(gè)機(jī)架上都有交換機(jī))
一個(gè)機(jī)房有多個(gè)機(jī)架,每個(gè)機(jī)架之間也通過(guò)交換機(jī)來(lái)交換數(shù)據(jù)
一個(gè)數(shù)據(jù)中心有多個(gè)機(jī)房
hadoop API訪問(wèn)java代碼
@Test
public void readFile() throws Exception {
System.out.println("HELLO");
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
URL url = new URL("hdfs://20.18.5.1:8020/user/hadoop/hadoop/test.txt");
URLConnection conn = url.openConnection();
InputStream is = conn.getInputStream();
byte[] buf = new byte[is.available()];
is.read(buf);
is.close();
String str = new String(buf);
System.out.println(str);
}
@Test
public void readFileByURL() throws Exception {
System.out.println("HELLOURL");
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
InputStream in = new URL("hdfs://20.18.5.1:8020/user/hadoop/hadoop/test.txt").openStream();
byte[] buf = new byte[in.available()];
in.read(buf);
in.close();
String str = new String(buf);
System.out.println(str);
}
@Test
public void readFileByAPI() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://20.18.5.1:8020/");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/hadoop/test.txt");
FSDataInputStream fis = fs.open(path);
byte[] buf = new byte[1024];
int len = -1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ( (len = fis.read(buf)) != -1){
baos.write(buf,0,len);
}
fis.close();
baos.close();
System.out.println("---------------------------------------");
System.out.println(baos);
System.out.println(new String(baos.toByteArray()));
System.out.println(baos.toByteArray());
System.out.println(baos.toByteArray().toString());
System.out.println("---------------------------------------");
}
@Test
public void readFileByAPI2() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://20.18.5.1:8020/");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/hadoop/test.txt");
FSDataInputStream fis = fs.open(path);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copyBytes(fis,baos,1024,true);
System.out.println(new String(baos.toByteArray()));
}
@Test
public void mkdir() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://20.18.5.1:8020/");
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path("/user/hadoop/myhadoop"));
}
@Test
public void putFile() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", URLStr);
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fos = fs.create(new Path("/user/hadoop/myhadoop/a.txt"));
fos.write("hello lll".getBytes());
fos.close();
}
@Test
public void removeFile() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", URLStr);
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/user/hadoop/myhadoop"),true);
}
@Test
public void aapendFile() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", URLStr);
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fos = fs.append(new Path("/user/hadoop/myhadoop/a.txt"));
fos.write("hello xxx".getBytes());
fos.close();
}
@Test
public void test1() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataInputStream fis = fs.open(new Path("hdfs://nn1/user/hadoop/hello.txt"));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOUtils.copyBytes(fis,baos,1024);
baos.close();
fis.close();
System.out.println(new String(baos.toByteArray()));
}
@Test
public void test2() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fout = fs.create(new Path("hdfs://nn1/user/hadoop/a.txt"));
fout.write("hello boy".getBytes());
fout.close();
}
@Test
public void test3() throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataOutputStream fout = fs.create(new Path("hdfs://nn1/user/hadoop/a.txt"),
true,1024,(short)2,1024);
FileInputStream fios = new FileInputStream("e:/a.txt");
IOUtils.copyBytes(fios,fout,1024);
fout.close();
fios.close();
}
blocksize的配置蕉毯,在上面代碼test3中遇到的問(wèn)題以及修改
配置hadoop的最小blockSize,必須是512的倍數(shù)乓搬,因?yàn)樵趆dfs寫(xiě)入過(guò)程期間會(huì)進(jìn)行校驗(yàn),最小進(jìn)行的單位是512字節(jié)進(jìn)行校驗(yàn)代虾,所以大小必須超過(guò)512的大小进肯,且是512的倍數(shù)
需要在節(jié)點(diǎn)中的hdfs-site.xml
文件中添加下面屬性的值來(lái)修改block的size,修改完以后需要重新啟動(dòng)節(jié)點(diǎn)start-dfs.sh
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>1024</value>
</property>