MapReduce 分區(qū)
分區(qū)概述
在 MapReduce 中, 通過我們指定分區(qū), 會將同一個分區(qū)的數(shù)據(jù)發(fā)送到同一個 Reduce 當(dāng)中進(jìn)行處理
例如: 為了數(shù)據(jù)的統(tǒng)計, 可以把一批類似的數(shù)據(jù)發(fā)送到同一個 Reduce 當(dāng)中, 在同一個 Reduce 當(dāng)中統(tǒng)計相同類型的數(shù)據(jù), 就可以實(shí)現(xiàn)類似的數(shù)據(jù)分區(qū)和統(tǒng)計等
其實(shí)就是相同類型的數(shù)據(jù), 有共性的數(shù)據(jù), 送到一起去處理
Reduce 當(dāng)中默認(rèn)的分區(qū)只有一個
需求:將以下數(shù)據(jù)進(jìn)行分開處理
詳細(xì)數(shù)據(jù)參見partition.csv 這個文本文件谷浅,其中第五個字段表示開獎結(jié)果數(shù)值寨典,現(xiàn)在需求將15以上的結(jié)果以及15以下的結(jié)果進(jìn)行分開成兩個文件進(jìn)行保存
分區(qū)步驟:
Step 1. 定義 Mapper
這個 Mapper 程序不做任何邏輯, 也不對 Key-Value 做任何改變, 只是接收數(shù)據(jù), 然后往下發(fā)送
public class MyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
Step 2. 自定義 Partitioner
主要的邏輯就在這里, 這也是這個案例的意義, 通過 Partitioner 將數(shù)據(jù)分發(fā)給不同的 Reducer
/**
* 這里的輸入類型與我們map階段的輸出類型相同
*/
public class MyPartitioner extends Partitioner<Text,NullWritable>{
/**
* 返回值表示我們的數(shù)據(jù)要去到哪個分區(qū)
* 返回值只是一個分區(qū)的標(biāo)記,標(biāo)記所有相同的數(shù)據(jù)去到指定的分區(qū)
*/
@Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
String result = text.toString().split("\t")[5];
if (Integer.parseInt(result) > 15){
return 1;
}else{
return 0;
}
}
}
Step 3. 定義 Reducer 邏輯
這個 Reducer 也不做任何處理, 將數(shù)據(jù)原封不動的輸出即可
public class MyReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
Step 4. 主類中設(shè)置分區(qū)類和ReduceTask個數(shù)
public class PartitionMain extends Configured implements Tool {
public static void main(String[] args) throws Exception{
int run = ToolRunner.run(new Configuration(), new PartitionMain(), args);
System.exit(run);
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), PartitionMain.class.getSimpleName());
job.setJarByClass(PartitionMain.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://192.168.52.250:8020/partitioner"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.52.250:8020/outpartition"));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(MyReducer.class);
/**
* 設(shè)置我們的分區(qū)類耕餐,以及我們的reducetask的個數(shù),注意reduceTask的個數(shù)一定要與我們的
* 分區(qū)數(shù)保持一致
*/
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(2);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
MapReduce 中的計數(shù)器
計數(shù)器是收集作業(yè)統(tǒng)計信息的有效手段之一,用于質(zhì)量控制或應(yīng)用級統(tǒng)計。計數(shù)器還可輔助診斷系統(tǒng)故障红竭。如果需要將日志信息傳輸?shù)?map 或 reduce 任務(wù), 更好的方法通常是看能否用一個計數(shù)器值來記錄某一特定事件的發(fā)生喘落。對于大型分布式作業(yè)而言茵宪,使用計數(shù)器更為方便。除了因?yàn)楂@取計數(shù)器值比輸出日志更方便瘦棋,還有根據(jù)計數(shù)器值統(tǒng)計特定事件的發(fā)生次數(shù)要比分析一堆日志文件容易得多稀火。
hadoop內(nèi)置計數(shù)器列表
MapReduce任務(wù)計數(shù)器 | org.apache.hadoop.mapreduce.TaskCounter |
---|---|
文件系統(tǒng)計數(shù)器 | org.apache.hadoop.mapreduce.FileSystemCounter |
FileInputFormat計數(shù)器 | org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
FileOutputFormat計數(shù)器 | org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
作業(yè)計數(shù)器 | org.apache.hadoop.mapreduce.JobCounter |
每次mapreduce執(zhí)行完成之后,我們都會看到一些日志記錄出來兽狭,其中最重要的一些日志記錄如下截圖
所有的這些都是MapReduce的計數(shù)器的功能憾股,既然MapReduce當(dāng)中有計數(shù)器的功能,我們?nèi)绾螌?shí)現(xiàn)自己的計數(shù)器箕慧?服球??
需求:以以上分區(qū)代碼為案例颠焦,統(tǒng)計map接收到的數(shù)據(jù)記錄條數(shù)
第一種方式
第一種方式定義計數(shù)器斩熊,通過context上下文對象可以獲取我們的計數(shù)器,進(jìn)行記錄
通過context上下文對象伐庭,在map端使用計數(shù)器進(jìn)行統(tǒng)計
public class PartitionMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
//map方法將K1和V1轉(zhuǎn)為K2和V2
@Override
protected void map(LongWritable key, Text value, Context context) throws Exception{
Counter counter = context.getCounter("MR_COUNT", "MyRecordCounter");
counter.increment(1L);
context.write(value,NullWritable.get());
}
}
運(yùn)行程序之后就可以看到我們自定義的計數(shù)器在map階段讀取了七條數(shù)據(jù)
第二種方式
通過enum枚舉類型來定義計數(shù)器
統(tǒng)計reduce端數(shù)據(jù)的輸入的key有多少個
public class PartitionerReducer extends Reducer<Text,NullWritable,Text,NullWritable> {
public static enum Counter{
MY_REDUCE_INPUT_RECORDS,MY_REDUCE_INPUT_BYTES
}
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.getCounter(Counter.MY_REDUCE_INPUT_RECORDS).increment(1L);
context.write(key, NullWritable.get());
}
}