上傳文件
PutFile.java
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class PutFile {
public static void main(String[] args) throws IOException,URISyntaxException {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.56.31:9000");
FileSystem fs = FileSystem.get(uri,conf);
//本地文件
Path src = new Path("D:\\scala\\文檔\\63\\access.txt");
//HDFS存放位置
Path dst = new Path("/");
fs.copyFromLocalFile(src, dst);
System.out.println("Upload to " + conf.get("fs.defaultFS"));
// 以下相當(dāng)于執(zhí)行hdfs dfs -ls /
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file:files) {
System.out.println(file.getPath());
}
}
}
創(chuàng)建文件
CreateFile.java
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateFile {
public static void main(String[] args) throws Exception {
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
// 定義新文件
Path dfs = new Path("/hdfsfile");
// 創(chuàng)建新文件敢靡,如果有則覆蓋(true)
FSDataOutputStream create = fs.create(dfs,true);
create.writeBytes("Hello,HDFS !");
}
}
查看文件詳細信息
FileLocation.java
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class FileLocation {
public static void main(String[] args) throws Exception {
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
Path fpath = new Path("/access.txt");
FileStatus filestatus = fs.getFileStatus(fpath);
/*
* 獲取文件在HDFS集群位置:
* FileSystem.getFileBlockLocation(FileStatus file,long start, long len)"
* 可查找指定文件在HDFS集群上的位置,其中file為文件的完整路徑著隆,start和len來標(biāo)識查找文件的路徑
*/
BlockLocation[]blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
filestatus.getAccessTime();
for(int i=0;i<blkLocations.length;i++) {
String[] hosts = blkLocations[i].getHosts();
System.out.println("block_"+i+"_location:"+hosts[0]);
}
// 格式化日期輸出
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 獲取文件訪問時間,返回long
long accessTime = filestatus.getAccessTime();
System.out.println("access:"+formatter.format(new Date(accessTime)));
// 獲取文件修改時間,返回long
long modificationTime = filestatus.getModificationTime();
System.out.println("modification:"+formatter.format(new Date(modificationTime)));
// 獲取塊大小肌访,單位B
long blockSize = filestatus.getBlockSize();
System.out.println("blockSize:"+blockSize);
// 獲取文件大小潘飘,單位B
long len = filestatus.getLen();
System.out.println("length:"+len);
// 獲取文件所在用戶組
String group = filestatus.getGroup();
System.out.println("group:"+group);
// 獲取文件擁有者
String owner = filestatus.getOwner();
System.out.println("owner:"+owner);
// 獲取文件拷貝數(shù)
short replication = filestatus.getReplication();
System.out.println("replication:"+replication);
}
}
下載文件
GetFile.java
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetFile {
public static void main(String[] args) throws Exception {
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.56.31:9000"),new Configuration());
//hdfs上文件
Path src = new Path("/access.txt");
// 下載到本地的文件名
Path dst = new Path("D:\\scala\\文檔\\63\\newfile.txt");
fs.copyToLocalFile(src, dst);
}
}
RPC通信
反射機制
Student.java
interface people{
public void study();
}
public class Student implements people {
private String name; //名字;
private int age;
//構(gòu)造方法1捣作;
public Student() {}
// 構(gòu)造方法2;
public Student(String name,int age) {
this.name = name;
this.age = age;
}
//set和get方法鹅士;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public void study() {
System.out.println("正在學(xué)習(xí)");
}
// 程序的主方法券躁;
public static void main(String[] args) {
//
Class<? extends Student> tmp=Student.class;
String cName = tmp.getName();
System.out.println("類的名字是"+cName);
try {
// 動態(tài)加載指定類名
Class c = Class.forName(cName);
//得到類中的方法;
java.lang.reflect.Method[] ms = c.getMethods();
for(java.lang.reflect.Method m:ms) {
System.out.println("方法的名字是"+m.getName());
System.out.println("方法的返回值類型是"+m.getReturnType().toString());
System.out.println("方法的參數(shù)類型是"+m.getParameterTypes());
}
//得到屬性
java.lang.reflect.Field[] fields = c.getFields();
for(java.lang.reflect.Field f:fields) {
System.out.println("參數(shù)類型是"+f.getType());
}
// 得到父接口
Class[] is = c.getInterfaces();
for(Class s:is) {
System.out.println("父接口的名字是"+s.getName());
}
// 判斷是否是數(shù)組
System.out.println("數(shù)組:"+c.isArray());
String CLName = c.getClassLoader().getClass().getName();
System.out.println("類加載器:"+CLName);
// 實例化構(gòu)造器
java.lang.reflect.Constructor cons = c.getConstructor(String.class,int.class);
Student stu = (Student) cons.newInstance("hadoop",23);
System.out.println(stu.getName()+":"+stu.getAge());
}catch (Exception e) {
e.printStackTrace();
}
}
}
MapReduce實現(xiàn)技術(shù)
WordMapper.java
package wordcount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// 創(chuàng)建一個WordMapper類繼承于Mapper抽象類
public class WordMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//Mapper抽象類的核心方法,三個參數(shù)
public void map( Object key, //首字符偏移量
Text value, //文件的一行內(nèi)容
Context context) //Mapper端的上下文也拜,與outputCollector和 Reporter的功能類似
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
WordReduce.java
package wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// 創(chuàng)建一個WordReducer類繼承于Reducer抽象類
public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable(); //記錄詞頻
//Reducer 抽象類的核心方法以舒,3個參數(shù)
public void reduce( Text key, //Map端輸出的key值
Iterable<IntWritable> values, // Map端輸出的Value集合
Context context)
throws IOException,InterruptedException {
int sum = 0;
for (IntWritable val : values) //遍歷values集合,并把值相加
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
WordMain.java
package wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordMain {
public static void main(String[] args) throws Exception {
//Configuration類:讀取Hadoop的配置文件慢哈,如core-site.xml...;
// 也可用set方法重新設(shè)置(會覆蓋):conf.set("fs.default.name",//"hdfs://xxxx:9000")
Configuration conf = new Configuration();
// 將命令行中參數(shù)自動設(shè)置到變量conf中
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
// conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
// conf.set("hadoop.job.user", "root");
// conf.set("mapreduce.framework.name", "yarn");
// conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
// conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
// conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
// conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
// conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
// conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
if(otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = new Job(conf, "word count"); // 新建一個job蔓钟,傳入配置信息
job.setJarByClass(WordMain.class); //設(shè)置主類
job.setMapperClass(WordMapper.class); //設(shè)置Mapper類
job.setCombinerClass(WordReducer.class); //設(shè)置作業(yè)合成類
job.setReducerClass(WordReducer.class); //設(shè)置Reducer類
job.setOutputKeyClass(Text.class); //設(shè)置輸出數(shù)據(jù)的關(guān)鍵類
job.setOutputValueClass(IntWritable.class); //設(shè)置輸出值類
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //文件輸入
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 文件輸出
System.exit(job.waitForCompletion(true) ? 0 : 1); // 等待完成退出
}
}
打包上傳
hdfs dfs -mkdir /user/hadoop
hdfs dfs -mkdir /user/hadoop/input
hdfs dfs -put file* /user/hadoop/input
hdfs dfs -ls /user/hadoop/input
hadoop jar wordcount.jar wordcount.WordMain /user/hadoop/input/file* /user/hadoop/output
hdfs dfs -ls /user/hadoop/output
hdfs dfs -text /user/hadoop/output/part-r-00000
hdfs://192.168.56.31:9000/user/hadoop/input
hdfs://192.168.56.31:9000/user/hadoop/output2
WordCount2.java
package wordcount2;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount2 {
public static class TokenizerMapper extends Mapper<Object, Text, Text,IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void Map(Object key,Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text, IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable>value, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val: value)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.56.31:9000");
conf.set("hadoop.job.user", "root");
conf.set("mapreduce.framework.name", "yarn");
conf.set("mapreduce.jobtracker.address", "192.168.56.31:9001");
conf.set("yarn.resourcemanager.hostname", "192.168.56.31");
conf.set("yarn.resourcemanager.admin.address", "192.168.56.31:8033");
conf.set("yarn.resourcemanager.address", "192.168.56.31:80312");
conf.set("yarn.resourcemanager.resource-tracker.address", "192.168.56.31:8031");
conf.set("yarn.resourcemanager.scheduler.address", "192.168.56.31:8030");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage:wordcount <in><out>");
System.exit(2);
}
Job job = new Job(conf, "word count2"); // 新建一個job,傳入配置信息
job.setJarByClass(WordCount2.class); //設(shè)置主類
job.setMapperClass(TokenizerMapper.class); //設(shè)置Mapper類
job.setCombinerClass(IntSumReducer.class); //設(shè)置作業(yè)合成類
job.setReducerClass(IntSumReducer.class); //設(shè)置Reducer類
job.setOutputKeyClass(Text.class); //設(shè)置輸出數(shù)據(jù)的關(guān)鍵類
job.setOutputValueClass(IntWritable.class); //設(shè)置輸出值類
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //文件輸入
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 文件輸出
boolean flag = job.waitForCompletion(true);
System.out.println("SUCCEED !"+flag); //任務(wù)完成提示
System.exit(flag ? 0 : 1);
System.out.println();
}
}