springboot通過Java Api操作Docker中的Hadoop(填坑)

Hadoop是一個(gè)分布式的文件系統(tǒng)(HDFS)娱挨,由很多服務(wù)器聯(lián)合起來實(shí)現(xiàn)其功能,集群中的服務(wù)器有各自的角色捕犬,用于存儲(chǔ)文件通過目錄樹來定位文件跷坝。
HDFS集群包括,NameNode碉碉、DataNode柴钻、Secondary Namenode:
(1)NameNode:負(fù)責(zé)管理整個(gè)文件系統(tǒng)的元數(shù)據(jù),以及每一個(gè)路徑(文件)所對(duì)應(yīng)的數(shù)據(jù)塊信息垢粮。
(2)DataNode:負(fù)責(zé)管理用戶的文件數(shù)據(jù)塊纵装,每一個(gè)數(shù)據(jù)塊都可以在多個(gè)datanode上存儲(chǔ)多個(gè)副本职车。
(3)Secondary NameNode用來監(jiān)控HDFS狀態(tài)的輔助后臺(tái)程序尖奔,每隔一段時(shí)間獲取HDFS元數(shù)據(jù)的快照。

HDFS可以通過Java Api來實(shí)現(xiàn)對(duì)HDFS內(nèi)的文件進(jìn)行讀寫操作占键。

1、Hadoop安裝
由于本篇重點(diǎn)講HDFS的開發(fā)绍些,Hadoop的安裝配置就不重點(diǎn)講捞慌,我們通過最快速的方式來實(shí)現(xiàn)Hadoop安裝,即通過下載別人已經(jīng)配置好的Docker鏡像進(jìn)行Hadoop安裝柬批。
我的鏡像:registry.cn-hangzhou.aliyuncs.com/xvjialing/hadoop
這個(gè)鏡像是已經(jīng)完全配置好的Hadoop啸澡,通過偽分布式方式(管理節(jié)點(diǎn)和數(shù)據(jù)節(jié)點(diǎn)在一臺(tái)機(jī))部署,直接運(yùn)行即可使用氮帐。
(1)創(chuàng)建容器:

[root@iZbp13sno1lc2yxlhjc4b3Z ~]# docker run --name hadoop -d -p 8091:50070 -p 8092:9000 registry.cn-hangzhou.aliyuncs.com/xvjialing/hadoop

說明:
50070端口:提供HDFS的管理控制臺(tái)嗅虏,可在瀏覽器里面查看HDFS各節(jié)點(diǎn)信息和文件目錄
9000端口:提供訪問HDFS文件系統(tǒng)地址,通過該端口對(duì)文件進(jìn)行讀寫操作

該鏡像hadoop安裝目錄:/usr/local/hadoop

(2)配置文件:
2個(gè)重要的核心配置文件上沐,hdfs-site.xml皮服、core-site.xml。配置文件目錄:/usr/local/hadoop/etc/hadoop
hdfs-site.xml参咙,配置hdfs各節(jié)點(diǎn)目錄地址:

<?xml version="1.0"?>
<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:///root/hdfs/namenode</value>
        <description>NameNode directory for namespace and transaction logs storage.</description>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:///root/hdfs/datanode</value>
        <description>DataNode directory</description>
    </property>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

core-site.xml龄广,配置hdfs的訪問地址端口:

<?xml version="1.0"?>
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000/</value>
    </property>
</configuration>

(3)訪問控制臺(tái)
瀏覽器輸入http://192.168.2.104:8091


image.png

可查看hdfs的各節(jié)點(diǎn)健康信息以及文件目錄信息。

2蕴侧、hadoop項(xiàng)目搭建
使用springboot快速搭建項(xiàng)目結(jié)構(gòu)择同,在項(xiàng)目里面引入hadoop相關(guān)的依賴包即可。
(1)pom.xml

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--hadoop依賴-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.1</version>
        </dependency>
    </dependencies>

(2)application.yml
主要配置hadoop的訪問地址净宵,以及訪問用戶

#HDFS配置
hdfs:
  path: hdfs://192.168.2.104:8092
  user: root

注意:
path:我這里是docker主機(jī)的地址和端口敲才,對(duì)應(yīng)映射是docker里面hdfs配置的地址端口,即:hdfs://localhost:9000
user:這里是個(gè)坑點(diǎn)择葡,這個(gè)用戶必須是運(yùn)行hadoop的系統(tǒng)用戶紧武,否則會(huì)報(bào)沒有權(quán)限操作的異常,由于docker是root用戶敏储,所以填root阻星。你可以試下隨便填,看報(bào)什么錯(cuò)

(3)主要的JAVA類
操作HDFS涉及到的主要類就兩個(gè)已添,比較簡單:
Configuration:封裝客戶端和服務(wù)端的配置迫横,就是訪問hdfs的地址path和用戶user。
FileSystem:封裝操作文件系統(tǒng)對(duì)象酝碳,提供了很多方法來對(duì)hdfs進(jìn)行操作,就是對(duì)目錄/文件進(jìn)行增/刪/改/查/上傳/下載/遍歷等恨狈。

3疏哗、通過API操作hdfs
(1)獲取FileSystem對(duì)象

public class HdfsService {

    @Value("${hdfs.path}")
    private String hdfsPath;

    @Value("${hdfs.user}")
    private String user;

    /**
     * 獲取hdfs配置信息
     * @return
     */
    private Configuration getConfiguration(){
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", hdfsPath);
        return configuration;
    }

    /**
     * 獲取文件系統(tǒng)對(duì)象
     * @return
     */
    public FileSystem getFileSystem() throws Exception {
        FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), user);
        return fileSystem;
    }
...
}

(2)通過FileSystem操作hdfs
在Service層添加如下API操作代碼:


    /**
     * 創(chuàng)建HDFS文件夾
     * @param dir
     * @return
     * @throws Exception
     */
    public boolean mkdir(String dir) throws Exception{
        if(StringUtils.isBlank(dir)){
            return false;
        }
        if(exist(dir)){
            return true;
        }
        FileSystem fileSystem = getFileSystem();
        boolean isOk = fileSystem.mkdirs(new Path(dir));
        fileSystem.close();
        return isOk;
    }

    /**
     * 判斷HDFS的文件是否存在
     * @param path
     * @return
     * @throws Exception
     */
    public boolean exist(String path) throws Exception {
        if(StringUtils.isBlank(path)){
            return false;
        }
        FileSystem fileSystem = getFileSystem();
        return fileSystem.exists(new Path(path));
    }

    /**
     * 讀取路徑下的文件信息
     * @param path
     * @return
     * @throws Exception
     */
    public List<Map<String,Object>> readPathInfo(String path) throws Exception {
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FileStatus[] statuses = fs.listStatus(new Path(path));
        if(statuses == null || statuses.length < 1){
            return null;
        }
        List<Map<String,Object>> list = new ArrayList<>();
        for(FileStatus fileStatus : statuses){
            Map<String,Object> map = new HashMap<>();
            map.put("filePath", fileStatus.getPath());
            map.put("fileStatus", fileStatus.toString());

            list.add(map);
        }
        return  list;
    }

    /**
     * HDFS創(chuàng)建文件
     * @param path
     * @param file
     * @throws Exception
     */
    public void createFile(String path, MultipartFile file) throws Exception {
        if(StringUtils.isBlank(path) || null == file){
            return;
        }
        FileSystem fs = getFileSystem();
        String fileName = file.getOriginalFilename();//文件名
        Path filePath = new Path(path + "/" + fileName);
        FSDataOutputStream outputStream = fs.create(filePath);
        outputStream.write(file.getBytes());
        outputStream.close();
        fs.close();
    }

    /**
     * 讀取HDFS文件內(nèi)容
     * @param path
     * @return
     * @throws Exception
     */
    public String readFileToString(String path) throws Exception{
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FSDataInputStream inputStream = null;
        try {
            inputStream = fs.open(new Path(path));
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
            StringBuffer sb = new StringBuffer();
            String line = "";
            while ((line = reader.readLine()) != null){
                sb.append(line);
            }
            return sb.toString();
        }finally {
            if(inputStream != null){
                inputStream.close();
            }
            fs.close();
        }
    }

    /**
     * 獲取目錄下的文件列表
     * @param path
     * @return
     * @throws Exception
     */
    public List<Map<String,Object>> listFiles(String path) throws Exception {
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(path), true);
        List<Map<String,Object>> list = new ArrayList<>();
        while (iterator.hasNext()){
            LocatedFileStatus fileStatus = iterator.next();
            Map<String,Object> map = new HashMap<>();
            map.put("filePath", fileStatus.getPath().toString());
            map.put("fileName", fileStatus.getPath().getName());

            list.add(map);
        }
        return  list;
    }

    /**
     * 重命名HDFS文件
     * @param oldName
     * @param newName
     * @return
     * @throws Exception
     */
    public boolean renameFile(String oldName, String newName)throws Exception{
        if(!exist(oldName) || StringUtils.isBlank(newName)){
            return false;
        }
        FileSystem fs = getFileSystem();
        boolean isOk = fs.rename(new Path(oldName), new Path(newName));
        fs.close();
        return isOk;
    }

    /**
     * 刪除HDFS文件
     * @param path
     * @return
     * @throws Exception
     */
    public boolean deleteFile(String path)throws Exception {
        if(!exist(path)){
            return false;
        }
        FileSystem fs = getFileSystem();
        boolean isOk = fs.deleteOnExit(new Path(path));
        fs.close();
        return isOk;
    }

    /**
     * 上傳文件到HDFS
     * @param path
     * @param uploadPath
     * @throws Exception
     */
    public void uploadFile(String path,String uploadPath) throws Exception{
        if(StringUtils.isBlank(path) || StringUtils.isBlank(uploadPath)){
            return;
        }
        FileSystem fs = getFileSystem();
        fs.copyFromLocalFile(new Path(path), new Path(uploadPath));
        fs.close();
    }

    /**
     * 從HDFS下載文件
     * @param path
     * @param downloadPath
     * @throws Exception
     */
    public void downloadFile(String path, String downloadPath) throws Exception{
        if(StringUtils.isBlank(path) || StringUtils.isBlank(downloadPath)){
            return;
        }
        FileSystem fs = getFileSystem();
        fs.copyToLocalFile(new Path(path), new Path(downloadPath) );
        fs.close();
    }

    /**
     * 拷貝HDFS文件
     * @param sourcePath
     * @param targetPath
     * @throws Exception
     */
    public void copyFile(String sourcePath, String targetPath) throws Exception{
        if(StringUtils.isBlank(sourcePath) || StringUtils.isBlank(targetPath)){
            return;
        }
        FileSystem fs = getFileSystem();
        FSDataInputStream inputStream = null;
        FSDataOutputStream outputStream = null;
        try{
            inputStream = fs.open(new Path(sourcePath));
            outputStream = fs.create(new Path(targetPath));
            //todo IOUtils.copyBytes(inputStream, outputStream, , false);
        }finally {
            if(inputStream != null){
                inputStream.close();
            }
            if(outputStream != null){
                outputStream.close();
            }
            fs.close();
        }
    }

    /**
     * 讀取HDFS文件并返回byte[]
     * @param path
     * @return
     * @throws Exception
     */
    public byte[] readFileToBytes(String path) throws Exception{
        if(!exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FSDataInputStream inputStream = null;
        try {
            inputStream = fs.open(new Path(path));
            return IOUtils.readFullyToByteArray(inputStream);
        }finally {
            if(inputStream != null){
                inputStream.close();
            }
            fs.close();
        }
    }

    /**
     * 獲取文件塊在集群的位置
     * @param path
     * @return
     * @throws Exception
     */
    public BlockLocation[] getFileBlockLocations(String path)throws Exception{
        if(exist(path)){
            return null;
        }
        FileSystem fs = getFileSystem();
        FileStatus fileStatus = fs.getFileStatus(new Path(path));
        return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    }

(3)編寫Controller提供接口


@RestController
@RequestMapping("/api/hdfs")
public class HdfsController {

    @Autowired
    private HdfsService service;

    @GetMapping("/mkdir")
    public Object mkdir(String path){
        try {
            service.mkdir(path);
            return RtnData.ok();
        } catch (Exception e) {
            return RtnData.fail(e);
        }
    }

    @PostMapping("/createFile")
    public Object createFile(String path, MultipartFile file){
        try {
            service.createFile(path, file);
            return RtnData.ok();
        } catch (Exception e) {
            return RtnData.fail(e);
        }
    }

    @GetMapping("/readFileToString")
    public Object readFileToString(String path){
        try {
            return RtnData.ok(service.readFileToString(path));
        } catch (Exception e) {
            return RtnData.fail(e);
        }
    }
...
}

4、來個(gè)測試
我們通過調(diào)用/api/hdfs/mkdir接口來創(chuàng)建目錄禾怠,然后在控制臺(tái)看下是文件目錄是否存在返奉。
通過postman調(diào)用接口贝搁,創(chuàng)建/test目錄:

image.png

登錄控制臺(tái),菜單Utilities/Browse the file siystem芽偏,t可以看到已經(jīng)成功添加:
image.png

5雷逆、填下坑,一定要看
下面的幾個(gè)坑點(diǎn)我搞了好幾天才解決污尉。
(1)可能你會(huì)遇到Connect Refused之類的錯(cuò)誤膀哲。
修改core-site.xml,將hdfs://localhost:9000/中的localhost修改為容器地址
(2)可能你會(huì)遇到Permission denied之類的錯(cuò)誤
客戶端調(diào)用的時(shí)候被碗,user不是運(yùn)行hdfs的用戶/用戶組某宪,沒有權(quán)限操作。
(3)可能你會(huì)遇到HADOOP_HOME and hadoop.home.dir are unset之類的報(bào)錯(cuò)
客戶端沒有安裝設(shè)置hadoop的環(huán)境變量:HADOOP_HOME锐朴⌒宋梗可以不予理會(huì),不影響操作焚志。
(4)可能你會(huì)遇到Could not locate Hadoop executable: xxx\bin\winutils.exe之類的報(bào)錯(cuò)
客戶端hadoop的bin目錄沒有winutils.exe文件衣迷。可以不予理會(huì)酱酬,不影響操作壶谒。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市岳悟,隨后出現(xiàn)的幾起案子佃迄,更是在濱河造成了極大的恐慌,老刑警劉巖贵少,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件呵俏,死亡現(xiàn)場離奇詭異,居然都是意外死亡滔灶,警方通過查閱死者的電腦和手機(jī)普碎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來录平,“玉大人麻车,你說我怎么就攤上這事《氛猓” “怎么了动猬?”我有些...
    開封第一講書人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長表箭。 經(jīng)常有香客問我赁咙,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任彼水,我火速辦了婚禮崔拥,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘凤覆。我一直安慰自己链瓦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開白布盯桦。 她就那樣靜靜地躺著慈俯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪俺附。 梳的紋絲不亂的頭發(fā)上肥卡,一...
    開封第一講書人閱讀 51,115評(píng)論 1 296
  • 那天,我揣著相機(jī)與錄音事镣,去河邊找鬼步鉴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛璃哟,可吹牛的內(nèi)容都是我干的氛琢。 我是一名探鬼主播,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼随闪,長吁一口氣:“原來是場噩夢啊……” “哼阳似!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起铐伴,我...
    開封第一講書人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤撮奏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后当宴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體畜吊,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年户矢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了玲献。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡梯浪,死狀恐怖捌年,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情挂洛,我是刑警寧澤礼预,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站虏劲,受9級(jí)特大地震影響逆瑞,放射性物質(zhì)發(fā)生泄漏荠藤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一获高、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧吻育,春花似錦念秧、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至游两,卻和暖如春砾层,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背贱案。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來泰國打工肛炮, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宝踪。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓侨糟,卻偏偏與公主長得像,于是被迫代替她去往敵國和親瘩燥。 傳聞我的和親對(duì)象是個(gè)殘疾皇子秕重,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353