HDFS中的Java API的使用

上傳文件

PutFile.java

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class PutFile {

    public static void main(String[] args) throws IOException,URISyntaxException {
        Configuration conf = new Configuration();
        URI uri = new URI("hdfs://192.168.56.31:9000");
        FileSystem fs = FileSystem.get(uri,conf);
        //本地文件
        Path src = new Path("D:\\scala\\文檔\\63\\access.txt");
        //HDFS存放位置
        Path dst = new Path("/");
        fs.copyFromLocalFile(src, dst);
        System.out.println("Upload to " + conf.get("fs.defaultFS"));
        // 以下相當(dāng)于執(zhí)行hdfs dfs -ls /
        FileStatus files[] = fs.listStatus(dst);
        
        for (FileStatus file:files) {
            System.out.println(file.getPath());
        }
        
        

    }

}

創(chuàng)建文件

CreateFile.java


import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CreateFile {

    public static void main(String[] args) throws Exception {
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
        // 定義新文件
        Path dfs = new Path("/hdfsfile");
        // 創(chuàng)建新文件敢靡,如果有則覆蓋(true)
        FSDataOutputStream create = fs.create(dfs,true);
        
        create.writeBytes("Hello,HDFS !");

    }

}

查看文件詳細信息

FileLocation.java

import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileLocation {

    public static void main(String[] args) throws Exception {
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
        Path fpath = new Path("/access.txt");
        FileStatus filestatus = fs.getFileStatus(fpath);
        /*
         * 獲取文件在HDFS集群位置:
         * FileSystem.getFileBlockLocation(FileStatus file,long start, long len)"
         * 可查找指定文件在HDFS集群上的位置,其中file為文件的完整路徑著隆,start和len來標(biāo)識查找文件的路徑
         */
        BlockLocation[]blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
        filestatus.getAccessTime();
            for(int i=0;i<blkLocations.length;i++) {
                String[] hosts = blkLocations[i].getHosts();
                System.out.println("block_"+i+"_location:"+hosts[0]);
            }
        // 格式化日期輸出
        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 獲取文件訪問時間,返回long
        long accessTime = filestatus.getAccessTime();
        System.out.println("access:"+formatter.format(new Date(accessTime)));
        // 獲取文件修改時間,返回long
        long modificationTime = filestatus.getModificationTime();
        System.out.println("modification:"+formatter.format(new Date(modificationTime)));
        // 獲取塊大小肌访,單位B
        long blockSize = filestatus.getBlockSize();
        System.out.println("blockSize:"+blockSize);
        // 獲取文件大小潘飘,單位B
        long len = filestatus.getLen();
        System.out.println("length:"+len);
        // 獲取文件所在用戶組
        String group = filestatus.getGroup();
        System.out.println("group:"+group);
        // 獲取文件擁有者
        String owner = filestatus.getOwner();
        System.out.println("owner:"+owner);
        // 獲取文件拷貝數(shù)
        short replication = filestatus.getReplication();
        System.out.println("replication:"+replication);
    }

}

下載文件

GetFile.java

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class GetFile {

    public static void main(String[] args) throws Exception {
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
        //hdfs上文件
        Path src = new Path("/access.txt");
        // 下載到本地的文件名
        Path dst = new Path("D:\\scala\\文檔\\63\\newfile.txt");
        fs.copyToLocalFile(src, dst);

    }

}

RPC通信

反射機制

Student.java

interface people{
    public void study();
}
public class Student implements people {
    private String name; //名字;
    private int age;
    //構(gòu)造方法1捣作;
    public Student() {}
    // 構(gòu)造方法2;
    public Student(String name,int age) {
        this.name = name;
        this.age = age;
    }
    //set和get方法鹅士;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    
    public void study() {
        System.out.println("正在學(xué)習(xí)");
    }
    // 程序的主方法券躁;
    public static void main(String[] args) {
    // 
    Class<? extends Student> tmp=Student.class;
    String cName = tmp.getName();
    System.out.println("類的名字是"+cName);
    try {
    // 動態(tài)加載指定類名
        Class c = Class.forName(cName);
        //得到類中的方法;
        java.lang.reflect.Method[] ms = c.getMethods();
        for(java.lang.reflect.Method m:ms) {
            System.out.println("方法的名字是"+m.getName());
            System.out.println("方法的返回值類型是"+m.getReturnType().toString());
            System.out.println("方法的參數(shù)類型是"+m.getParameterTypes());
        }
        //得到屬性
        java.lang.reflect.Field[] fields = c.getFields();
        for(java.lang.reflect.Field f:fields) {
            System.out.println("參數(shù)類型是"+f.getType());
        }
        // 得到父接口
        Class[] is = c.getInterfaces();
        for(Class s:is) {
            System.out.println("父接口的名字是"+s.getName());
        }
        // 判斷是否是數(shù)組
        System.out.println("數(shù)組:"+c.isArray());
        String CLName = c.getClassLoader().getClass().getName();
        System.out.println("類加載器:"+CLName);
        // 實例化構(gòu)造器
        java.lang.reflect.Constructor cons = c.getConstructor(String.class,int.class);
        Student stu = (Student) cons.newInstance("hadoop",23);
        System.out.println(stu.getName()+":"+stu.getAge());
    }catch (Exception e) {
        e.printStackTrace();
    }

    }
}

MapReduce實現(xiàn)技術(shù)

WordMapper.java

package wordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// 創(chuàng)建一個WordMapper類繼承于Mapper抽象類
public class WordMapper extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    //Mapper抽象類的核心方法,三個參數(shù)
    public void map( Object key,  //首字符偏移量
                    Text value,   //文件的一行內(nèi)容
                    Context context)  //Mapper端的上下文也拜,與outputCollector和 Reporter的功能類似
                throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }

}

WordReduce.java

package wordcount;



import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// 創(chuàng)建一個WordReducer類繼承于Reducer抽象類
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    private IntWritable result = new IntWritable();   //記錄詞頻
    //Reducer 抽象類的核心方法以舒,3個參數(shù)
    public void reduce( Text key,   //Map端輸出的key值
            Iterable<IntWritable> values,  // Map端輸出的Value集合
            Context context)  
            throws IOException,InterruptedException {
        int sum = 0;
        for (IntWritable val : values)  //遍歷values集合,并把值相加
        {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
    
}

WordMain.java

package wordcount;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordMain {

    public static void main(String[] args) throws Exception {
        //Configuration類:讀取Hadoop的配置文件慢哈,如core-site.xml...;
        // 也可用set方法重新設(shè)置(會覆蓋):conf.set("fs.default.name",//"hdfs://xxxx:9000")
        Configuration conf = new Configuration();
        
        // 將命令行中參數(shù)自動設(shè)置到變量conf中
        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        //      conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
//      conf.set("hadoop.job.user", "root");
//      conf.set("mapreduce.framework.name", "yarn");
//      conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
//      conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
//      conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
//      conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
//      conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
//      conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");

        if(otherArgs.length != 2)
        {
            System.err.println("Usage: wordcount <in><out>");
            System.exit(2);
        }
        
        Job job = new Job(conf, "word count");  // 新建一個job蔓钟,傳入配置信息
        job.setJarByClass(WordMain.class);  //設(shè)置主類
        job.setMapperClass(WordMapper.class);  //設(shè)置Mapper類
        job.setCombinerClass(WordReducer.class);  //設(shè)置作業(yè)合成類
        job.setReducerClass(WordReducer.class); //設(shè)置Reducer類
        job.setOutputKeyClass(Text.class);  //設(shè)置輸出數(shù)據(jù)的關(guān)鍵類
        job.setOutputValueClass(IntWritable.class);    //設(shè)置輸出值類
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  //文件輸入
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  // 文件輸出
        System.exit(job.waitForCompletion(true) ? 0 : 1);  // 等待完成退出
    }

}

打包上傳

hdfs dfs -mkdir /user/hadoop
hdfs dfs -mkdir /user/hadoop/input
hdfs dfs -put file* /user/hadoop/input
hdfs dfs -ls /user/hadoop/input
hadoop jar wordcount.jar wordcount.WordMain /user/hadoop/input/file* /user/hadoop/output
hdfs dfs -ls /user/hadoop/output
hdfs dfs -text /user/hadoop/output/part-r-00000
hdfs://192.168.56.31:9000/user/hadoop/input
hdfs://192.168.56.31:9000/user/hadoop/output2
image.png

WordCount2.java

package wordcount2;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount2 {
    
    public static class TokenizerMapper extends Mapper<Object, Text, Text,IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void Map(Object key,Text value, Context context) throws IOException,InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens())
            {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    
    public static class IntSumReducer extends Reducer<Text,IntWritable,Text, IntWritable>
    {
        private IntWritable result = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable>value, Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val: value)
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
        conf.set("hadoop.job.user", "root");
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
        conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
        conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
        conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
        conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
        conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
        
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2)
        {
            System.err.println("Usage:wordcount <in><out>");
            System.exit(2);
        }
        
        Job job = new Job(conf, "word count2");  // 新建一個job,傳入配置信息
        job.setJarByClass(WordCount2.class);  //設(shè)置主類
        job.setMapperClass(TokenizerMapper.class);  //設(shè)置Mapper類
        job.setCombinerClass(IntSumReducer.class);  //設(shè)置作業(yè)合成類
        job.setReducerClass(IntSumReducer.class); //設(shè)置Reducer類
        job.setOutputKeyClass(Text.class);  //設(shè)置輸出數(shù)據(jù)的關(guān)鍵類
        job.setOutputValueClass(IntWritable.class);    //設(shè)置輸出值類
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  //文件輸入
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  // 文件輸出
        boolean flag = job.waitForCompletion(true);
        System.out.println("SUCCEED !"+flag);  //任務(wù)完成提示
        System.exit(flag ? 0 : 1);
        System.out.println();
    }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末卵贱,一起剝皮案震驚了整個濱河市滥沫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌键俱,老刑警劉巖兰绣,帶你破解...
    沈念sama閱讀 216,843評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異编振,居然都是意外死亡缀辩,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評論 3 392
  • 文/潘曉璐 我一進店門踪央,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雌澄,“玉大人,你說我怎么就攤上這事杯瞻「湮” “怎么了?”我有些...
    開封第一講書人閱讀 163,187評論 0 353
  • 文/不壞的土叔 我叫張陵魁莉,是天一觀的道長睬涧。 經(jīng)常有香客問我,道長旗唁,這世上最難降的妖魔是什么畦浓? 我笑而不...
    開封第一講書人閱讀 58,264評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮检疫,結(jié)果婚禮上讶请,老公的妹妹穿的比我還像新娘。我一直安慰自己屎媳,他們只是感情好夺溢,可當(dāng)我...
    茶點故事閱讀 67,289評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著烛谊,像睡著了一般风响。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上丹禀,一...
    開封第一講書人閱讀 51,231評論 1 299
  • 那天状勤,我揣著相機與錄音鞋怀,去河邊找鬼。 笑死持搜,一個胖子當(dāng)著我的面吹牛密似,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播葫盼,決...
    沈念sama閱讀 40,116評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼辛友,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了剪返?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,945評論 0 275
  • 序言:老撾萬榮一對情侶失蹤邓梅,失蹤者是張志新(化名)和其女友劉穎脱盲,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體日缨,經(jīng)...
    沈念sama閱讀 45,367評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡钱反,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,581評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了匣距。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片面哥。...
    茶點故事閱讀 39,754評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖毅待,靈堂內(nèi)的尸體忽然破棺而出尚卫,到底是詐尸還是另有隱情,我是刑警寧澤尸红,帶...
    沈念sama閱讀 35,458評論 5 344
  • 正文 年R本政府宣布吱涉,位于F島的核電站,受9級特大地震影響外里,放射性物質(zhì)發(fā)生泄漏怎爵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,068評論 3 327
  • 文/蒙蒙 一盅蝗、第九天 我趴在偏房一處隱蔽的房頂上張望鳖链。 院中可真熱鬧,春花似錦墩莫、人聲如沸芙委。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽题山。三九已至,卻和暖如春故痊,著一層夾襖步出監(jiān)牢的瞬間顶瞳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留慨菱,地道東北人焰络。 一個月前我還...
    沈念sama閱讀 47,797評論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像符喝,于是被迫代替她去往敵國和親闪彼。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,654評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理协饲,服務(wù)發(fā)現(xiàn)畏腕,斷路器,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • 1.import static是Java 5增加的功能,就是將Import類中的靜態(tài)方法茉稠,可以作為本類的靜態(tài)方法來...
    XLsn0w閱讀 1,222評論 0 2
  • 土生土長的我們誓竿,想回憶家鄉(xiāng)磅网,體會家鄉(xiāng)每一寸土地,每一種農(nóng)產(chǎn)品筷屡,歡迎關(guān)注:鄉(xiāng)里村
    pengziren閱讀 451評論 0 0
  • 如果我告訴你,大家的情況幾乎都是這樣规哲,但是有解決方案跟啤,你相信嗎? 我的工作是組建完整的產(chǎn)品研發(fā)團隊唉锌,包括工程師隅肥、設(shè)...
    啟庵閱讀 502評論 0 2
  • 心靈手巧,針起線依彎月皎袄简。經(jīng)緯勾連腥放,霜染眉梢多少年。 錦衣織就绿语,愛滿胸襟情暖袖秃症。攬物思娘候址,何日還鄉(xiāng)侍近旁? 每逢佳...
    靜鈴音閱讀 558評論 20 18