Hadoop ha環(huán)境下的java api操作

1:POM苫亦;引入jar包
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.2</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>3.1.2</version>
</dependency>

2:設(shè)置連接池
import com.kafka.KafkaBase.service.HdfsService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • HDFS相關(guān)配置

  • @author zhenmin.gu

  • @since 1.0.0
    */
    @Configuration
    public class HdfsConfig {

    private String defaultHdfsUri = "hdfs://ns1";

    @Bean
    public HdfsService getHbaseService(){
    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();

     return new HdfsService(conf,defaultHdfsUri);
    

    }
    }

3:讀寫操作API

import com.alibaba.fastjson.JSON;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**

  • HDFS相關(guān)的基本操作

  • @since 1.0.0
    */
    public class HdfsService {

    private Logger logger = LoggerFactory.getLogger(HdfsService.class);
    private Configuration conf = null;

    /**

    • 默認(rèn)的HDFS路徑
      */
      private String defaultHdfsUri;

    public HdfsService(Configuration conf,String defaultHdfsUri) {
    this.conf = conf;
    this.defaultHdfsUri = defaultHdfsUri;
    }

    /**

    • 獲取HDFS文件系統(tǒng)
    • @return org.apache.hadoop.fs.FileSystem
      */
      private FileSystem getFileSystem() throws IOException {
      return FileSystem.get(conf);
      }

    /**

    • 創(chuàng)建HDFS目錄

    • @since 1.0.0

    • @param path HDFS的相對(duì)目錄路徑毛肋,比如:/testDir

    • @return boolean 是否創(chuàng)建成功
      */
      public boolean mkdir(String path){
      //如果目錄已經(jīng)存在,則直接返回
      if(checkExists(path)){
      return true;
      }else{
      FileSystem fileSystem = null;

       try {
           fileSystem = getFileSystem();
      
           //最終的HDFS文件目錄
           String hdfsPath = generateHdfsPath(path);
           //創(chuàng)建目錄
           return fileSystem.mkdirs(new Path(hdfsPath));
       } catch (IOException e) {
           logger.error(MessageFormat.format("創(chuàng)建HDFS目錄失敗屋剑,path:{0}",path),e);
           return false;
       }finally {
           close(fileSystem);
       }
      

      }
      }

    /**

    • 上傳文件至HDFS
    • @since 1.0.0
    • @param srcFile 本地文件路徑润匙,比如:D:/test.txt
    • @param dstPath HDFS的相對(duì)目錄路徑,比如:/testDir
      */
      public void uploadFileToHdfs(String srcFile, String dstPath){
      this.uploadFileToHdfs(false, true, srcFile, dstPath);
      }

    /**

    • 上傳文件至HDFS

    • @since 1.0.0

    • @param delSrc 是否刪除本地文件

    • @param overwrite 是否覆蓋HDFS上面的文件

    • @param srcFile 本地文件路徑唉匾,比如:D:/test.txt

    • @param dstPath HDFS的相對(duì)目錄路徑孕讳,比如:/testDir
      */
      public void uploadFileToHdfs(boolean delSrc, boolean overwrite, String srcFile, String dstPath){
      //源文件路徑
      Path localSrcPath = new Path(srcFile);
      //目標(biāo)文件路徑
      Path hdfsDstPath = new Path(generateHdfsPath(dstPath));

      FileSystem fileSystem = null;
      try {
      fileSystem = getFileSystem();

       fileSystem.copyFromLocalFile(delSrc,overwrite,localSrcPath,hdfsDstPath);
      

      } catch (IOException e) {
      logger.error(MessageFormat.format("上傳文件至HDFS失敗,srcFile:{0},dstPath:{1}",srcFile,dstPath),e);
      }finally {
      close(fileSystem);
      }
      }

    /**

    • 判斷文件或者目錄是否在HDFS上面存在

    • @since 1.0.0

    • @param path HDFS的相對(duì)目錄路徑巍膘,比如:/testDir厂财、/testDir/a.txt

    • @return boolean
      */
      public boolean checkExists(String path){
      FileSystem fileSystem = null;
      try {
      fileSystem = getFileSystem();

       //最終的HDFS文件目錄
       String hdfsPath = generateHdfsPath(path);
      
       //創(chuàng)建目錄
       return fileSystem.exists(new Path(hdfsPath));
      

      } catch (IOException e) {
      logger.error(MessageFormat.format("'判斷文件或者目錄是否在HDFS上面存在'失敗,path:{0}",path),e);
      return false;
      }finally {
      close(fileSystem);
      }
      }

    /**

    • 獲取HDFS上面的某個(gè)路徑下面的所有文件或目錄(不包含子目錄)信息

    • @since 1.0.0

    • @param path HDFS的相對(duì)目錄路徑峡懈,比如:/testDir

    • @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
      */
      public List<Map<String,Object>> listFiles(String path, PathFilter pathFilter){
      //返回?cái)?shù)據(jù)
      List<Map<String,Object>> result = new ArrayList<>();

      //如果目錄已經(jīng)存在璃饱,則繼續(xù)操作
      if(checkExists(path)){
      FileSystem fileSystem = null;

       try {
           fileSystem = getFileSystem();
      
           //最終的HDFS文件目錄
           String hdfsPath = generateHdfsPath(path);
      
           FileStatus[] statuses;
           //根據(jù)Path過濾器查詢
           if(pathFilter != null){
               statuses = fileSystem.listStatus(new Path(hdfsPath),pathFilter);
           }else{
               statuses = fileSystem.listStatus(new Path(hdfsPath));
           }
      
           if(statuses != null){
               for(FileStatus status : statuses){
                   //每個(gè)文件的屬性
                   Map<String,Object> fileMap = new HashMap<>(2);
      
                   fileMap.put("path",status.getPath().toString());
                   fileMap.put("isDir",status.isDirectory());
      
                   result.add(fileMap);
               }
           }
       } catch (IOException e) {
           logger.error(MessageFormat.format("獲取HDFS上面的某個(gè)路徑下面的所有文件失敗,path:{0}",path),e);
       }finally {
           close(fileSystem);
       }
      

      }

      return result;
      }

/**
 * 從HDFS下載文件至本地
 * 
 * @since 1.0.0
 * @param srcFile HDFS的相對(duì)目錄路徑逮诲,比如:/testDir/a.txt
 * @param dstFile 下載之后本地文件路徑(如果本地文件目錄不存在帜平,則會(huì)自動(dòng)創(chuàng)建)幽告,比如:D:/test.txt
 */
public void downloadFileFromHdfs(String srcFile, String dstFile){
    //HDFS文件路徑
    Path hdfsSrcPath = new Path(generateHdfsPath(srcFile));
    //下載之后本地文件路徑
    Path localDstPath = new Path(dstFile);

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        fileSystem.copyToLocalFile(hdfsSrcPath,localDstPath);
    } catch (IOException e) {
        logger.error(MessageFormat.format("從HDFS下載文件至本地失敗,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
    }finally {
        close(fileSystem);
    }
}

/**
 * 打開HDFS上面的文件并返回 InputStream
 * 
 * @since 1.0.0
 * @param path HDFS的相對(duì)目錄路徑裆甩,比如:/testDir/c.txt
 * @return FSDataInputStream
 */
public FSDataInputStream open(String path){
    //HDFS文件路徑
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        return fileSystem.open(hdfsPath);
    } catch (IOException e) {
        logger.error(MessageFormat.format("打開HDFS上面的文件失敗冗锁,path:{0}",path),e);
    }

    return null;
}

/**
 * 打開HDFS上面的文件并返回byte數(shù)組,方便Web端下載文件
 * <p>new ResponseEntity<byte[]>(byte數(shù)組, headers, HttpStatus.CREATED);</p>
 * <p>或者:new ResponseEntity<byte[]>(FileUtils.readFileToByteArray(templateFile), headers, HttpStatus.CREATED);</p>
 * 
 * @since 1.0.0
 * @param path HDFS的相對(duì)目錄路徑嗤栓,比如:/testDir/b.txt
 * @return FSDataInputStream
 */
public byte[] openWithBytes(String path){
    //HDFS文件路徑
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    FSDataInputStream inputStream = null;
    try {
        fileSystem = getFileSystem();
        inputStream = fileSystem.open(hdfsPath);

        return IOUtils.toByteArray(inputStream);
    } catch (IOException e) {
        logger.error(MessageFormat.format("打開HDFS上面的文件失敗冻河,path:{0}",path),e);
    }finally {
        if(inputStream != null){
            try {
                inputStream.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }

    return null;
}

/**
 * 打開HDFS上面的文件并返回String字符串
 * 
 * @since 1.0.0
 * @param path HDFS的相對(duì)目錄路徑,比如:/testDir/b.txt
 * @return FSDataInputStream
 */
public String openWithString(String path){
    //HDFS文件路徑
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    FSDataInputStream inputStream = null;
    try {
        fileSystem = getFileSystem();
        inputStream = fileSystem.open(hdfsPath);

        return IOUtils.toString(inputStream, String.valueOf(Charset.forName("UTF-8")));
    } catch (IOException e) {
        logger.error(MessageFormat.format("打開HDFS上面的文件失敗茉帅,path:{0}",path),e);
    }finally {
        if(inputStream != null){
            try {
                inputStream.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }

    return null;
}

/**
 * 打開HDFS上面的文件并轉(zhuǎn)換為Java對(duì)象(需要HDFS上門的文件內(nèi)容為JSON字符串)
 * 
 * @since 1.0.0
 * @param path HDFS的相對(duì)目錄路徑叨叙,比如:/testDir/c.txt
 * @return FSDataInputStream
 */
public <T extends Object> T openWithObject(String path, Class<T> clazz){
    //1、獲得文件的json字符串
    String jsonStr = this.openWithString(path);

    //2堪澎、使用com.alibaba.fastjson.JSON將json字符串轉(zhuǎn)化為Java對(duì)象并返回
    return JSON.parseObject(jsonStr, clazz);
}

/**
 * 重命名
 * 
 * @since 1.0.0
 * @param srcFile 重命名之前的HDFS的相對(duì)目錄路徑擂错,比如:/testDir/b.txt
 * @param dstFile 重命名之后的HDFS的相對(duì)目錄路徑,比如:/testDir/b_new.txt
 */
public boolean rename(String srcFile, String dstFile) {
    //HDFS文件路徑
    Path srcFilePath = new Path(generateHdfsPath(srcFile));
    //下載之后本地文件路徑
    Path dstFilePath = new Path(dstFile);

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        return fileSystem.rename(srcFilePath,dstFilePath);
    } catch (IOException e) {
        logger.error(MessageFormat.format("重命名失敗樱蛤,srcFile:{0},dstFile:{1}",srcFile,dstFile),e);
    }finally {
        close(fileSystem);
    }

    return false;
}

/**
 * 刪除HDFS文件或目錄
 * 
 * @since 1.0.0
 * @param path HDFS的相對(duì)目錄路徑钮呀,比如:/testDir/c.txt
 * @return boolean
 */
public boolean delete(String path) {
    //HDFS文件路徑
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();

        return fileSystem.delete(hdfsPath,true);
    } catch (IOException e) {
        logger.error(MessageFormat.format("刪除HDFS文件或目錄失敗,path:{0}",path),e);
    }finally {
        close(fileSystem);
    }

    return false;
}

/**
 * 獲取某個(gè)文件在HDFS集群的位置
 * 
 * @since 1.0.0
 * @param path HDFS的相對(duì)目錄路徑昨凡,比如:/testDir/a.txt
 * @return org.apache.hadoop.fs.BlockLocation[]
 */
public BlockLocation[] getFileBlockLocations(String path) {
    //HDFS文件路徑
    Path hdfsPath = new Path(generateHdfsPath(path));

    FileSystem fileSystem = null;
    try {
        fileSystem = getFileSystem();
        FileStatus fileStatus = fileSystem.getFileStatus(hdfsPath);

        return fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    } catch (IOException e) {
        logger.error(MessageFormat.format("獲取某個(gè)文件在HDFS集群的位置失敗爽醋,path:{0}",path),e);
    }finally {
        close(fileSystem);
    }

    return null;
}


/**
 * 將相對(duì)路徑轉(zhuǎn)化為HDFS文件路徑
 * 
 * @since 1.0.0
 * @param dstPath 相對(duì)路徑,比如:/data
 * @return java.lang.String
 */
private String generateHdfsPath(String dstPath){
    String hdfsPath = defaultHdfsUri;
    if(dstPath.startsWith("/")){
        hdfsPath += dstPath;
    }else{
        hdfsPath = hdfsPath + "/" + dstPath;
    }

    return hdfsPath;
}

/**
 * close方法
 */
private void close(FileSystem fileSystem){
    if(fileSystem != null){
        try {
            fileSystem.close();
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }
}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末便脊,一起剝皮案震驚了整個(gè)濱河市蚂四,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌哪痰,老刑警劉巖遂赠,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異妒御,居然都是意外死亡解愤,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門乎莉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人奸笤,你說我怎么就攤上這事惋啃。” “怎么了监右?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵边灭,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我健盒,道長(zhǎng)绒瘦,這世上最難降的妖魔是什么称簿? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮惰帽,結(jié)果婚禮上憨降,老公的妹妹穿的比我還像新娘。我一直安慰自己该酗,他們只是感情好授药,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著乐导,像睡著了一般路狮。 火紅的嫁衣襯著肌膚如雪绑咱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天娇澎,我揣著相機(jī)與錄音,去河邊找鬼睹晒。 笑死趟庄,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的册招。 我是一名探鬼主播岔激,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼是掰!你這毒婦竟也來了虑鼎?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤键痛,失蹤者是張志新(化名)和其女友劉穎炫彩,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體絮短,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡江兢,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了丁频。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片杉允。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖席里,靈堂內(nèi)的尸體忽然破棺而出叔磷,到底是詐尸還是另有隱情,我是刑警寧澤奖磁,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布改基,位于F島的核電站,受9級(jí)特大地震影響咖为,放射性物質(zhì)發(fā)生泄漏秕狰。R本人自食惡果不足惜稠腊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鸣哀。 院中可真熱鬧架忌,春花似錦、人聲如沸诺舔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)低飒。三九已至许昨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間褥赊,已是汗流浹背糕档。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留拌喉,地道東北人速那。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像尿背,于是被迫代替她去往敵國(guó)和親端仰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 通過API操作HDFS 今天的主要內(nèi)容 HDFS獲取文件系統(tǒng) HDFS文件上傳 HDFS文件下載 HDFS目錄創(chuàng)建...
    須臾之北閱讀 2,699評(píng)論 0 3
  • 前言 Netflix電影推薦的百萬美金比賽田藐,把“推薦”變成了時(shí)下最熱門的數(shù)據(jù)挖掘算法之一荔烧。也正是由于Netflix...
    Alukar閱讀 1,509評(píng)論 0 11
  • kerberos 介紹 閱讀本文之前建議先預(yù)讀下面這篇博客kerberos認(rèn)證原理---講的非常細(xì)致,易懂 Ker...
    PunyGod閱讀 20,007評(píng)論 7 29
  • 本文以Loadrunner的Java_Vuser腳本為例汽久,來做一次HDFS的文件操作測(cè)試鹤竭,由于LoadRunner...
    smooth00閱讀 390評(píng)論 0 1
  • 獨(dú)坐朝南路 孤身向北庭 秋風(fēng)千滴露 冷雨四周星 謫別南過海 當(dāng)窗思北冥 他鄉(xiāng)人是客 羈旅嘆伶仃
    思明帝閱讀 201評(píng)論 0 1