例子實現目標
該代碼實現的是在輸入的數據對中彪标,先以第一列由小到大排序仿便,如果第一列值相等谦去,以第二列由小到大排序憎蛤。即:
添加cp.txt文件到input文件夾
$vim cp.txt
$hadoop fs -put cp.txt /input/
5,1
3,2
1,3
4,3
2,3
1,4
1,2
2,5
輸出結果
1,2
1,3
1,4
2,3
2,5,
3,2
4,3
5,1
附圖:
實踐例子
1.終端執(zhí)行>start-all.sh
2.input文件夾下增加cp.txt文件
3.打開eclipse
4.新建mapreduce項目外傅,新建包(命名mr),新建類(命名MySortClass )類代碼如下:
5.右鍵俩檬,選擇run as hadoop
6.右鍵refresh一下hadoop文件萎胰,成功后output下會出現成功排序的結果文件
package mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import mr.MyWordCount.MyMapper;
import mr.MyWordCount.MyReduce;
public class MySortClass {
static class MySortMapper extends Mapper<LongWritable, Text, A, NullWritable>{
public void map(LongWritable k1, Text v1, Context context)
throws java.io.IOException, java.lang.InterruptedException
{
String[] lines= v1.toString().split(",");
A a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));
context.write(a1, NullWritable.get());
System.out.println("map......");
}
}
static class MySortReduce extends Reducer<A, NullWritable, Text, Text>{
public void reduce(A k2, Iterable<NullWritable> v2, Context context) throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));
System.out.println("reduce......");
}
}
private static class A implements WritableComparable<A> {
long firstNum;
long secondNum;
public A() {
}
public A(long first, long second) {
firstNum = first;
secondNum = second;
}
public void write(DataOutput out) throws IOException {
out.writeLong(firstNum);
out.writeLong(secondNum);
}
public void readFields(DataInput in) throws IOException {
firstNum = in.readLong();
secondNum = in.readLong();
}
/*
* 當key進行排序時會調用以下這個compreTo方法
*/
@Override
public int compareTo(A anotherKey) {
long min = firstNum - anotherKey.firstNum;
if (min != 0) {
// 說明第一列不相等,則返回兩數之間小的數
return (int) min;
} else {
return (int) (secondNum - anotherKey.secondNum);
}
}
}
private static String INPUT_PATH="hdfs://master:9000/input/cp.txt";
private static String OUTPUT_PATH="hdfs://master:9000/output/c/";
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
if(fs.exists(new Path(OUTPUT_PATH)))
fs.delete(new Path(OUTPUT_PATH));
Job job=new Job(conf,"myjob");
job.setJarByClass(MySortClass.class);
job.setMapperClass(MySortMapper.class);
job.setReducerClass(MySortReduce.class);
//job.setCombinerClass(MySortReduce.class);
job.setMapOutputKeyClass(A.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
/*如果map和reduce的<key,value>類型是一樣的棚辽,
則僅設置job.setOutputKeyClass();job.setOutputValueClass();即可*/
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
}
部分代碼理解
類A實現了WritableComparable技竟,設置了兩個屬性firstNum; secondNum;
String[] lines= v1.toString().split(",");
讀取一行(5,1)以逗號分隔,兩個元素(5)(1)存入數組lines
A a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));
Long.parseLong(lines[0])將string類型的“5”轉化為long類型屈藐,a1.firstNum=5;a1.secondNum=1;
context.write(a1, NullWritable.get());
寫入上下文榔组,設置map的輸出為<key,空>,不能使用new NullWritable()來定義,獲取空值只能NullWritable.get()來獲取
context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));
reduce生成新的鍵值對联逻,如:將<(5,1),null>轉化為<5,1>
以map->reduce集群處理流程理解該例子(假設文件龐大)
1.首先對輸入文件分片(inputSplit)搓扯,假設分片大小為三行,那么分為三片:
5,1
3,2
1,3
4,3
2,3
1,4
1,2
2,5
2.三片交由三個map進程處理包归,生成鍵值對<a1,null>锨推,為減少帶寬負荷,在本地節(jié)點上做了排序公壤,分區(qū)(partitioner换可,數據做了分區(qū)標記)輸出結果:
(如果有需要在分區(qū)之前還可以進行combiner(本地reduce操作,詳情請見文章《了解MapReduce》底部對combiner的解釋)厦幅,這里分區(qū)之前不需要combiner)
<(1,3)锦担,null>
<(3,2),null>
<(5,1)慨削,null>
<(1,4)洞渔,null>
<(2,3)套媚,null>
<(4,3),null>
<(1,2)磁椒,null>
<(2,5)堤瘤,null>
- 然后就是所有節(jié)點洗牌(shuffle),將各個節(jié)點上同個分區(qū)的數據放置到一個節(jié)點中浆熔,放置過去后做了排序:
<(1,2)本辐,null>
<(1,3),null>
<(1,4)医增,null>
<(2,3)慎皱,null>
<(2,5),null>
<(3,2)叶骨,null>
<(4,3)茫多,null>
<(5,1),null>
- 最后就是reduce忽刽,生成新鍵值對并生成最后排序結果
(1,2)
(1,3)
(1,4)
(2,3)
(2,5)
(3,2)
(4,3)
(5,1)
總的來說就是:map(本地)->combiner(本地)->partitioner(本地)->shuffle(集群)->reduce(新本地)天揖,各部分又還有細節(jié)操作,combiner和partitioner屬于map階段的跪帝,shuffle屬于reduce階段的今膊。
附圖理解: