1:POM苫亦;引入jar包
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>3.1.2</version>
</dependency>
2:設(shè)置連接池
import com.kafka.KafkaBase.service.HdfsService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
HDFS相關(guān)配置
@author zhenmin.gu
-
@since 1.0.0
*/
@Configuration
public class HdfsConfig {private String defaultHdfsUri = "hdfs://ns1";
@Bean
public HdfsService getHbaseService(){
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();return new HdfsService(conf,defaultHdfsUri);
}
}
3:讀寫操作API
import com.alibaba.fastjson.JSON;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
HDFS相關(guān)的基本操作
-
@since 1.0.0
*/
public class HdfsService {private Logger logger = LoggerFactory.getLogger(HdfsService.class);
private Configuration conf = null;/**
- 默認(rèn)的HDFS路徑
*/
private String defaultHdfsUri;
public HdfsService(Configuration conf,String defaultHdfsUri) {
this.conf = conf;
this.defaultHdfsUri = defaultHdfsUri;
}/**
- 獲取HDFS文件系統(tǒng)
- @return org.apache.hadoop.fs.FileSystem
*/
private FileSystem getFileSystem() throws IOException {
return FileSystem.get(conf);
}
/**
創(chuàng)建HDFS目錄
@since 1.0.0
@param path HDFS的相對(duì)目錄路徑毛肋,比如:/testDir
-
@return boolean 是否創(chuàng)建成功
*/
public boolean mkdir(String path){
//如果目錄已經(jīng)存在,則直接返回
if(checkExists(path)){
return true;
}else{
FileSystem fileSystem = null;try { fileSystem = getFileSystem(); //最終的HDFS文件目錄 String hdfsPath = generateHdfsPath(path); //創(chuàng)建目錄 return fileSystem.mkdirs(new Path(hdfsPath)); } catch (IOException e) { logger.error(MessageFormat.format("創(chuàng)建HDFS目錄失敗屋剑,path:{0}",path),e); return false; }finally { close(fileSystem); }
}
}
/**
- 上傳文件至HDFS
- @since 1.0.0
- @param srcFile 本地文件路徑润匙,比如:D:/test.txt
- @param dstPath HDFS的相對(duì)目錄路徑,比如:/testDir
*/
public void uploadFileToHdfs(String srcFile, String dstPath){
this.uploadFileToHdfs(false, true, srcFile, dstPath);
}
/**
上傳文件至HDFS
@since 1.0.0
@param delSrc 是否刪除本地文件
@param overwrite 是否覆蓋HDFS上面的文件
@param srcFile 本地文件路徑唉匾,比如:D:/test.txt
-
@param dstPath HDFS的相對(duì)目錄路徑孕讳,比如:/testDir
*/
public void uploadFileToHdfs(boolean delSrc, boolean overwrite, String srcFile, String dstPath){
//源文件路徑
Path localSrcPath = new Path(srcFile);
//目標(biāo)文件路徑
Path hdfsDstPath = new Path(generateHdfsPath(dstPath));FileSystem fileSystem = null;
try {
fileSystem = getFileSystem();fileSystem.copyFromLocalFile(delSrc,overwrite,localSrcPath,hdfsDstPath);
} catch (IOException e) {
logger.error(MessageFormat.format("上傳文件至HDFS失敗,srcFile:{0},dstPath:{1}",srcFile,dstPath),e);
}finally {
close(fileSystem);
}
}
/**
判斷文件或者目錄是否在HDFS上面存在
@since 1.0.0
@param path HDFS的相對(duì)目錄路徑巍膘,比如:/testDir厂财、/testDir/a.txt
-
@return boolean
*/
public boolean checkExists(String path){
FileSystem fileSystem = null;
try {
fileSystem = getFileSystem();//最終的HDFS文件目錄 String hdfsPath = generateHdfsPath(path); //創(chuàng)建目錄 return fileSystem.exists(new Path(hdfsPath));
} catch (IOException e) {
logger.error(MessageFormat.format("'判斷文件或者目錄是否在HDFS上面存在'失敗,path:{0}",path),e);
return false;
}finally {
close(fileSystem);
}
}
/**
獲取HDFS上面的某個(gè)路徑下面的所有文件或目錄(不包含子目錄)信息
@since 1.0.0
@param path HDFS的相對(duì)目錄路徑峡懈,比如:/testDir
-
@return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
*/
public List<Map<String,Object>> listFiles(String path, PathFilter pathFilter){
//返回?cái)?shù)據(jù)
List<Map<String,Object>> result = new ArrayList<>();//如果目錄已經(jīng)存在璃饱,則繼續(xù)操作
if(checkExists(path)){
FileSystem fileSystem = null;try { fileSystem = getFileSystem(); //最終的HDFS文件目錄 String hdfsPath = generateHdfsPath(path); FileStatus[] statuses; //根據(jù)Path過濾器查詢 if(pathFilter != null){ statuses = fileSystem.listStatus(new Path(hdfsPath),pathFilter); }else{ statuses = fileSystem.listStatus(new Path(hdfsPath)); } if(statuses != null){ for(FileStatus status : statuses){ //每個(gè)文件的屬性 Map<String,Object> fileMap = new HashMap<>(2); fileMap.put("path",status.getPath().toString()); fileMap.put("isDir",status.isDirectory()); result.add(fileMap); } } } catch (IOException e) { logger.error(MessageFormat.format("獲取HDFS上面的某個(gè)路徑下面的所有文件失敗,path:{0}",path),e); }finally { close(fileSystem); }
}
return result;
}
- 默認(rèn)的HDFS路徑
/**
* 從HDFS下載文件至本地
*
* @since 1.0.0
* @param srcFile HDFS的相對(duì)目錄路徑逮诲,比如:/testDir/a.txt
* @param dstFile 下載之后本地文件路徑(如果本地文件目錄不存在帜平,則會(huì)自動(dòng)創(chuàng)建)幽告,比如:D:/test.txt
*/
public void downloadFileFromHdfs(String srcFile, String dstFile){
//HDFS文件路徑
Path hdfsSrcPath = new Path(generateHdfsPath(srcFile));
//下載之后本地文件路徑
Path localDstPath = new Path(dstFile);
FileSystem fileSystem = null;
try {
fileSystem = getFileSystem();
fileSystem.copyToLocalFile(hdfsSrcPath,localDstPath);
} catch (IOException e) {
logger.error(MessageFormat.format("從HDFS下載文件至本地失敗,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
}finally {
close(fileSystem);
}
}
/**
* 打開HDFS上面的文件并返回 InputStream
*
* @since 1.0.0
* @param path HDFS的相對(duì)目錄路徑裆甩,比如:/testDir/c.txt
* @return FSDataInputStream
*/
public FSDataInputStream open(String path){
//HDFS文件路徑
Path hdfsPath = new Path(generateHdfsPath(path));
FileSystem fileSystem = null;
try {
fileSystem = getFileSystem();
return fileSystem.open(hdfsPath);
} catch (IOException e) {
logger.error(MessageFormat.format("打開HDFS上面的文件失敗冗锁,path:{0}",path),e);
}
return null;
}
/**
* 打開HDFS上面的文件并返回byte數(shù)組,方便Web端下載文件
* <p>new ResponseEntity<byte[]>(byte數(shù)組, headers, HttpStatus.CREATED);</p>
* <p>或者:new ResponseEntity<byte[]>(FileUtils.readFileToByteArray(templateFile), headers, HttpStatus.CREATED);</p>
*
* @since 1.0.0
* @param path HDFS的相對(duì)目錄路徑嗤栓,比如:/testDir/b.txt
* @return FSDataInputStream
*/
public byte[] openWithBytes(String path){
//HDFS文件路徑
Path hdfsPath = new Path(generateHdfsPath(path));
FileSystem fileSystem = null;
FSDataInputStream inputStream = null;
try {
fileSystem = getFileSystem();
inputStream = fileSystem.open(hdfsPath);
return IOUtils.toByteArray(inputStream);
} catch (IOException e) {
logger.error(MessageFormat.format("打開HDFS上面的文件失敗冻河,path:{0}",path),e);
}finally {
if(inputStream != null){
try {
inputStream.close();
} catch (IOException e) {
// ignore
}
}
}
return null;
}
/**
* 打開HDFS上面的文件并返回String字符串
*
* @since 1.0.0
* @param path HDFS的相對(duì)目錄路徑,比如:/testDir/b.txt
* @return FSDataInputStream
*/
public String openWithString(String path){
//HDFS文件路徑
Path hdfsPath = new Path(generateHdfsPath(path));
FileSystem fileSystem = null;
FSDataInputStream inputStream = null;
try {
fileSystem = getFileSystem();
inputStream = fileSystem.open(hdfsPath);
return IOUtils.toString(inputStream, String.valueOf(Charset.forName("UTF-8")));
} catch (IOException e) {
logger.error(MessageFormat.format("打開HDFS上面的文件失敗茉帅,path:{0}",path),e);
}finally {
if(inputStream != null){
try {
inputStream.close();
} catch (IOException e) {
// ignore
}
}
}
return null;
}
/**
* 打開HDFS上面的文件并轉(zhuǎn)換為Java對(duì)象(需要HDFS上門的文件內(nèi)容為JSON字符串)
*
* @since 1.0.0
* @param path HDFS的相對(duì)目錄路徑叨叙,比如:/testDir/c.txt
* @return FSDataInputStream
*/
public <T extends Object> T openWithObject(String path, Class<T> clazz){
//1、獲得文件的json字符串
String jsonStr = this.openWithString(path);
//2堪澎、使用com.alibaba.fastjson.JSON將json字符串轉(zhuǎn)化為Java對(duì)象并返回
return JSON.parseObject(jsonStr, clazz);
}
/**
* 重命名
*
* @since 1.0.0
* @param srcFile 重命名之前的HDFS的相對(duì)目錄路徑擂错,比如:/testDir/b.txt
* @param dstFile 重命名之后的HDFS的相對(duì)目錄路徑,比如:/testDir/b_new.txt
*/
public boolean rename(String srcFile, String dstFile) {
//HDFS文件路徑
Path srcFilePath = new Path(generateHdfsPath(srcFile));
//下載之后本地文件路徑
Path dstFilePath = new Path(dstFile);
FileSystem fileSystem = null;
try {
fileSystem = getFileSystem();
return fileSystem.rename(srcFilePath,dstFilePath);
} catch (IOException e) {
logger.error(MessageFormat.format("重命名失敗樱蛤,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
}finally {
close(fileSystem);
}
return false;
}
/**
* 刪除HDFS文件或目錄
*
* @since 1.0.0
* @param path HDFS的相對(duì)目錄路徑钮呀,比如:/testDir/c.txt
* @return boolean
*/
public boolean delete(String path) {
//HDFS文件路徑
Path hdfsPath = new Path(generateHdfsPath(path));
FileSystem fileSystem = null;
try {
fileSystem = getFileSystem();
return fileSystem.delete(hdfsPath,true);
} catch (IOException e) {
logger.error(MessageFormat.format("刪除HDFS文件或目錄失敗,path:{0}",path),e);
}finally {
close(fileSystem);
}
return false;
}
/**
* 獲取某個(gè)文件在HDFS集群的位置
*
* @since 1.0.0
* @param path HDFS的相對(duì)目錄路徑昨凡,比如:/testDir/a.txt
* @return org.apache.hadoop.fs.BlockLocation[]
*/
public BlockLocation[] getFileBlockLocations(String path) {
//HDFS文件路徑
Path hdfsPath = new Path(generateHdfsPath(path));
FileSystem fileSystem = null;
try {
fileSystem = getFileSystem();
FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);
return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
} catch (IOException e) {
logger.error(MessageFormat.format("獲取某個(gè)文件在HDFS集群的位置失敗爽醋,path:{0}",path),e);
}finally {
close(fileSystem);
}
return null;
}
/**
* 將相對(duì)路徑轉(zhuǎn)化為HDFS文件路徑
*
* @since 1.0.0
* @param dstPath 相對(duì)路徑,比如:/data
* @return java.lang.String
*/
private String generateHdfsPath(String dstPath){
String hdfsPath = defaultHdfsUri;
if(dstPath.startsWith("/")){
hdfsPath += dstPath;
}else{
hdfsPath = hdfsPath + "/" + dstPath;
}
return hdfsPath;
}
/**
* close方法
*/
private void close(FileSystem fileSystem){
if(fileSystem != null){
try {
fileSystem.close();
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}
}