MapReduce工作流程
流程圖如下
解釋
上面的流程是整個mapreduce最全工作流程脓魏,但是shuffle過程只是從第7步開始到第16步結(jié)束,具體shuffle過程詳解通惫,如下:
- maptask收集我們的map()方法輸出的kv對茂翔,放到內(nèi)存緩沖區(qū)中
- 從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件
- 多個溢出文件會被合并成大的溢出文件
- 在溢出過程中履腋,及合并的過程中珊燎,都要調(diào)用partitioner進行分區(qū)和針對key進行排序
- reducetask根據(jù)自己的分區(qū)號,去各個maptask機器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
- reducetask會取到同一個分區(qū)的來自不同maptask的結(jié)果文件遵湖,reducetask會將這些文件再進行合并(歸并排序)
- 合并成大文件后悔政,shuffle的過程也就結(jié)束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group延旧,調(diào)用用戶自定義的reduce()方法)
注意
Shuffle中的緩沖區(qū)大小會影響到mapreduce程序的執(zhí)行效率谋国,原則上說,緩沖區(qū)越大迁沫,磁盤io的次數(shù)越少芦瘾,執(zhí)行速度就越快捌蚊。
緩沖區(qū)的大小可以通過參數(shù)調(diào)整,參數(shù):io.sort.mb 默認(rèn)100M
InputFormat數(shù)據(jù)輸入
Job提交流程源碼
main方法調(diào)用job.waitForCompletion(true)方法近弟,waitForCompletion方法又調(diào)用了submit方法缅糟,提交過程開始了,submit部分代碼如下
//主要是獲取客戶端和RM進行RPC通信時使用的代理對象
connect();
//獲取JobSubmitter對象
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 調(diào)用JobSubmitter的submitJobInternal方法
return submitter.submitJobInternal(Job.this, cluster);
}
});
接下來看看connect()方法
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
在這個方法創(chuàng)建提交job的代理藐吮,看看具體是怎么工作的
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
/*
List<ClientProtocolProvider> localProviderList =
new ArrayList<ClientProtocolProvider>();
for (ClientProtocolProvider provider : frameworkLoader) {
localProviderList.add(provider);
}
*/
//initProviderList方法執(zhí)行了上面注釋的代碼溺拱,frameworkLoader中封裝了LocalClientProtocolProvider和YarnClientProtocolProvider對象
initProviderList();
//providerList中封裝了LocalClientProtocolProvider和YarnClientProtocolProvider對象逃贝,對應(yīng)的表示MR程序在本地運行和在YARN上運行
for (ClientProtocolProvider provider : providerList) {
ClientProtocol clientProtocol = null;
if (jobTrackAddr == null) {
//這里拿到RPC通信使用的本地代理對象
clientProtocol = provider.create(conf);
} else {
//這里拿到RPC通信使用的Yarn代理對象
clientProtocol = provider.create(jobTrackAddr, conf);
}
if (clientProtocol != null) {
clientProtocolProvider = provider;
//保存到Cluster中的client成員變量
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
}
}
接下來看看submitter.submitJobInternal方法
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//檢查作業(yè)輸出路徑是否存在谣辞,若存在拋出異常。
checkSpecs(job);
// 創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 獲取jobid 沐扳,并創(chuàng)建job路徑
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
// 創(chuàng)建job路徑泥从,拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// 計算切片,生成切片規(guī)劃文件
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
// 向Stag路徑寫xml配置文件
writeConf(conf, submitJobFile);
//客戶端準(zhǔn)備就緒沪摄,請求RM運行作業(yè)
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
if (jtFs != null && submitJobDir != null)
// 刪除相關(guān)文件
jtFs.delete(submitJobDir, true);
}
}
}
TextInputFormat切片源碼
接下來看看int maps = writeSplits(job, submitJobDir)是怎么切片的躯嫉,調(diào)用了writeNewSplits(job, jobSubmitDir)方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
看看writeNewSplits方法,調(diào)用了getSplits方法
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
// 通過反射獲取創(chuàng)建InputFormat對象
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
// 調(diào)用InputFormat的getSplits方法杨拐,默認(rèn)的InputFormat是
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
Arrays.sort(array, new SplitComparator());
// 將切片信息寫到一個切片規(guī)劃文件中
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
看看getSplits方法
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
/*
protected long getFormatMinSplitSize() {
return 1;
}
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
*/
// 最小切片數(shù)祈餐,getFormatMinSplitSize()值是1,getMinSplitSize(job)值是1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
/*
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
*/
// 最大切片數(shù)哄陶,Long的最大值
long maxSize = getMaxSplitSize(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
// 遍歷目錄下所有文件
for (FileStatus file: files) {
Path path = file.getPath();
// 獲取文件大小
long length = file.getLen();
if (length != 0) {
// 是可以分片的
if (isSplitable(job, path)) {
// blockSize 本地模式是32M帆阳,集群是128M
long blockSize = file.getBlockSize();
/*
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
*/
// 確定分片大小,默認(rèn)分片大小和blocksize相等
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// private static final double SPLIT_SLOP = 1.1;
// 每次切片時屋吨,都要判斷切完剩下的部分是否大于塊的1.1倍蜒谤,不大于1.1倍就劃分一塊切片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
TextInputFormat切片機制
- 簡單地按照文件的內(nèi)容長度進行切片
- 切片大小,默認(rèn)等于block大小
- 切片時不考慮數(shù)據(jù)集整體至扰,而是逐個針對每一個文件單獨切片
- 切片大小確定
在FileInputFormat中鳍徽,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));
切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認(rèn)值Long.MAXValue
因此,默認(rèn)情況下敢课,切片大小=blocksize阶祭。
maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會讓切片變小直秆,而且就等于配置的這個參數(shù)的值濒募。
minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大切厘。 - 數(shù)據(jù)切片只是在邏輯上對輸入數(shù)據(jù)進行分片萨咳,并不會再磁盤上將其切分成分片進行存儲。InputSplit只記錄了分片的元數(shù)據(jù)信息疫稿,比如起始位置培他、長度以及所在的節(jié)點列表等
- 提交切片規(guī)劃文件到y(tǒng)arn上鹃两,yarn上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計算開啟maptask個數(shù)
- 獲取切片信息API
// 根據(jù)文件類型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 獲取切片的文件名稱
String name = inputSplit.getPath().getName();
CombineTextInputFormat切片機制
關(guān)于大量小文件的優(yōu)化策略
默認(rèn)情況下TextInputformat對任務(wù)的切片機制是按文件規(guī)劃切片,不管文件多小舀凛,都會是一個單獨的切片俊扳,都會交給一個maptask,這樣如果有大量小文件猛遍,就會產(chǎn)生大量的maptask馋记,處理效率極其低下。
- 優(yōu)化策略
1). 最好的辦法懊烤,在數(shù)據(jù)處理系統(tǒng)的最前端(預(yù)處理/采集)梯醒,將小文件先合并成大文件,再上傳到HDFS做后續(xù)分析腌紧。
2). 補救措施:如果已經(jīng)是大量小文件在HDFS中了茸习,可以使用另一種InputFormat來做切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不同:它可以將多個小文件從邏輯上規(guī)劃到一個切片中壁肋,這樣号胚,多個小文件就可以交給一個maptask。
優(yōu)先滿足最小切片大小浸遗,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
2.具體實現(xiàn)步驟
// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
CombineTextInputFormat使用
使用TextInputFormat運行上篇文章的統(tǒng)計每一個手機號耗費的總上行流量猫胁、下行流量、總流量的程序跛锌,觀察有多少個maptask
修改Driver程序
public class CombineTextFlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
InputFormat接口實現(xiàn)類
MapReduce任務(wù)的輸入文件一般是存儲在HDFS里面弃秆。輸入的文件格式包括:基于行的日志文件、二進制格式文件等察净。這些文件一般會很大驾茴,達到數(shù)十GB,甚至更大氢卡。那么MapReduce是如何讀取這些數(shù)據(jù)的呢锈至?下面我們首先學(xué)習(xí)InputFormat接口。
InputFormat常見的接口實現(xiàn)類包括:TextInputFormat译秦、KeyValueTextInputFormat峡捡、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等筑悴。
下圖是InputFormat接口的實現(xiàn)類
- TextInputFormat是默認(rèn)的InputFormat们拙。每條記錄是一行輸入。鍵是LongWritable類型阁吝,存儲該行在整個文件中的起始字節(jié)偏移量砚婆。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符)突勇。
- 每一行均為一條記錄装盯,被分隔符分割為key坷虑,value」∧危可以通過在驅(qū)動類中設(shè)置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來設(shè)定分隔符迄损。默認(rèn)分隔符是tab(\t)
- 如果使用NlineInputFormat,代表每個map進程處理的InputSplit不再按block塊去劃分账磺,而是按NlineInputFormat指定的行數(shù)N來劃分芹敌。即輸入文件的總行數(shù)/N=切片數(shù),如果不整除垮抗,切片數(shù)=商+1氏捞。
KeyValueTextInputFormat使用
需求
統(tǒng)計輸入文件中每一行的第一個單詞相同的行數(shù)。
輸入文件
java hello
python hello
java hi
python hi
java hadoop
java hdfs
java mapreduce maptask reducetask
輸出文件
java 5
python 2
編碼實現(xiàn)
Mapper
public class KeyValueMapper extends Mapper<Text, Text, Text, IntWritable> {
private IntWritable v = new IntWritable(1);
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, v);
}
}
Reducer
public class KeyValueReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value: values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
Driver
public class KeyValueDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
Configuration configuration = new Configuration();
configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
Job job = Job.getInstance(configuration);
job.setJarByClass(KeyValueDriver.class);
job.setMapperClass(KeyValueMapper.class);
job.setReducerClass(KeyValueReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
NLineInputFormat使用
需求
對每個單詞進行個數(shù)統(tǒng)計借宵,要求根據(jù)每個輸入文件的行數(shù)來規(guī)定輸出多少個切片幌衣。此案例要求每五行放入一個切片中。
輸入數(shù)據(jù)
hello java
hello scala
hello hadoop
hello yarn
hello mapreduce
hello hive
hello hbase
hello java
hello scala
hello hadoop
hello yarn
hello mapreduce
hello hive
hello hbase
hello java
hello scala
hello hadoop
hello yarn
hello mapreduce
hello hive
hello hbase
hello java
hello scala
hello hadoop
hello yarn
hello mapreduce
hello hive
hello hbase
代碼實現(xiàn)
Maper和Reduce和第10篇文章WordCount代碼相同
Driver端做下修改即可
public class WordCountNLineDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
結(jié)果看起了多少個maptask
自定義InputFormat
需求
將多個小文件合并成一個SequenceFile文件(SequenceFile文件是Hadoop用來存儲二進制形式的key-value對的文件格式)壤玫,SequenceFile里面存儲著多個文件,存儲的形式為文件路徑+名稱為key哼凯,文件內(nèi)容為value欲间。
分析
- 自定義一個類繼承FileInputFormat
1). 重寫isSplitable()方法,返回false不可切割
2). 重寫createRecordReader()断部,創(chuàng)建自定義的RecordReader對象猎贴,并初始化 - 改寫RecordReader,實現(xiàn)一次讀取一個完整文件封裝為KV
- 在輸出時使用SequenceFileOutPutFormat輸出合并文件
實現(xiàn)
- 自定義InputFromat
public class TotalInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
/**
* 是否可以切片
* @param context
* @param filename
* @return
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
TotalRecordReader totalRecordReader = new TotalRecordReader();
totalRecordReader.initialize(split, context);
return totalRecordReader;
}
}
- 自定義RecordReader
public class TotalRecordReader extends RecordReader<NullWritable, BytesWritable> {
BytesWritable value = new BytesWritable();
private Configuration configuration;
private FileSplit split;
private boolean processed = false;
/**
* 初始化
* @param split
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit) split;
this.configuration = context.getConfiguration();
}
/**
* 業(yè)務(wù)邏輯處理
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
// 1 定義緩存區(qū)
byte[] contents = new byte[(int) split.getLength()];
Path path = split.getPath();
FileSystem fileSystem = FileSystem.get(configuration);
FSDataInputStream inputStream = fileSystem.open(path);
IOUtils.readFully(inputStream, contents, 0, contents.length);
value.set(contents, 0, contents.length);
processed = true;
inputStream.close();
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 獲取進度
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return processed ? 0 : 1;
}
/**
* 關(guān)閉資源
* @throws IOException
*/
@Override
public void close() throws IOException {
}
}
- Mapper
public class TotalMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text text = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
Path path = fileSplit.getPath();
text.set(path.toString());
}
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(text, value);
}
}
- Reducer
public class TotalReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
for(BytesWritable bytesWritable: values) {
context.write(key, bytesWritable);
}
}
}
- Driver
public class TotalDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(TotalDriver.class);
job.setInputFormatClass(TotalInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(TotalMapper.class);
job.setReducerClass(TotalReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
MapTask工作機制
- Read階段:MapTask通過用戶編寫的RecordReader蝴光,從輸入InputSplit中解析出一個個key/value她渴。
- Map階段:該節(jié)點主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value蔑祟。
- Collect收集階段:在用戶編寫map()函數(shù)中趁耗,當(dāng)數(shù)據(jù)處理完成后,一般會調(diào)用OutputCollector.collect()輸出結(jié)果疆虚。在該函數(shù)內(nèi)部苛败,它會將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中径簿。
- Spill階段:即“溢寫”罢屈,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會將數(shù)據(jù)寫到本地磁盤上篇亭,生成一個臨時文件缠捌。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前译蒂,先要對數(shù)據(jù)進行一次本地排序曼月,并在必要時對數(shù)據(jù)進行合并肃叶、壓縮等操作。
溢寫階段詳情:
1):利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進行排序十嘿,排序方式是因惭,先按照分區(qū)編號Partition進行排序,然后按照key進行排序绩衷。這樣蹦魔,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起咳燕,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序勿决。
2):按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時文件output/spillN.out(N表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了Combiner招盲,則寫入文件之前低缩,對每個分區(qū)中的數(shù)據(jù)進行一次聚集操作。
3):將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中曹货,其中每個分區(qū)的元信息包括在臨時文件中的偏移量咆繁、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過1MB顶籽,則將內(nèi)存索引寫到文件output/spillN.out.index中玩般。 - Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對所有臨時文件進行一次合并礼饱,以確保最終只會生成一個數(shù)據(jù)文件坏为。
當(dāng)所有數(shù)據(jù)處理完后,MapTask會將所有臨時文件合并成一個大文件镊绪,并保存到文件output/file.out中匀伏,同時生成相應(yīng)的索引文件output/file.out.index。
在進行文件合并過程中蝴韭,MapTask以分區(qū)為單位進行合并够颠。對于某個分區(qū),它將采用多輪遞歸合并的方式万皿。每輪合并io.sort.factor(默認(rèn)10)個文件摧找,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后牢硅,重復(fù)以上過程蹬耘,直到最終得到一個大文件。
讓每個MapTask最終只生成一個數(shù)據(jù)文件减余,可避免同時打開大量文件和同時讀取大量小文件產(chǎn)生的隨機讀取帶來的開銷综苔。
ReduceTask工作機制
ReduceTask工作機制
- Copy階段:ReduceTask從各個MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對某一片數(shù)據(jù),如果其大小超過一定閾值如筛,則寫到磁盤上堡牡,否則直接放到內(nèi)存中。
- Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時杨刨,ReduceTask啟動了兩個后臺線程對內(nèi)存和磁盤上的文件進行合并晤柄,以防止內(nèi)存使用過多或磁盤上文件過多。
- Sort階段:按照MapReduce語義妖胀,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進行聚集的一組數(shù)據(jù)芥颈。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略赚抡。由于各個MapTask已經(jīng)實現(xiàn)對自己的處理結(jié)果進行了局部排序爬坑,因此,ReduceTask只需對所有數(shù)據(jù)進行一次歸并排序即可涂臣。
- Reduce階段:reduce()函數(shù)將計算結(jié)果寫到HDFS上盾计。
設(shè)置ReduceTask并行度(個數(shù))
ReduceTask的并行度同樣影響整個Job的執(zhí)行并發(fā)度和執(zhí)行效率,但與MapTask的并發(fā)數(shù)由切片數(shù)決定不同赁遗,ReduceTask數(shù)量的決定是可以直接手動設(shè)置
// 默認(rèn)值是1署辉,手動設(shè)置為5
job.setNumReduceTasks(5);
注意事項
- ReduceTask=0,表示沒有Redce階段吼和,輸出文件個數(shù)和Map個數(shù)一致
- ReduceTask默認(rèn)值就是1涨薪,所以輸出文件個數(shù)為1個
- 如果數(shù)據(jù)分布不均勻,就有可能在Reduce階段產(chǎn)生數(shù)據(jù)傾斜
- ReduceTask數(shù)量并不是任意設(shè)置炫乓,還要考慮業(yè)務(wù)邏輯需求,有些情況下献丑,需要計算全局匯總結(jié)果末捣,就只能有一個ReduceTask
- 具體多少個ReduceTask,需要根據(jù)集群性能決定
- 如果分區(qū)數(shù)不是1创橄,但是ReduceTask為1箩做,不執(zhí)行分區(qū)過程,因為在MapTask的源碼中妥畏,執(zhí)行分區(qū)的前提是先判斷ReduceNum個數(shù)是否大于1邦邦,不大于1肯定不執(zhí)行分區(qū)過程。
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
Shuffle機制
Shuffle機制
Map方法之后醉蚁,Reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle
Partition分區(qū)
問題引出:要求將統(tǒng)計結(jié)果按照條件輸出到不同文件中(分區(qū))燃辖。比如:將統(tǒng)計結(jié)果按照手機歸屬地不同省份輸出到不同文件中(分區(qū))
- 默認(rèn)partition分區(qū)
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默認(rèn)分區(qū)是根據(jù)key的hashCode對reduceTasks個數(shù)取模得到的。用戶沒法控制哪個key存儲到哪個分區(qū)网棍。
- 自定義Partitioner步驟
1)自定義類繼承Partitioner黔龟,重寫getPartition()方法
2)在job驅(qū)動中,設(shè)置自定義partitioner
3)自定義partition后,要根據(jù)自定義partitioner的邏輯設(shè)置相應(yīng)數(shù)量的reduce task - 注意
如果reduceTask的數(shù)量> getPartition的結(jié)果數(shù)氏身,則會多產(chǎn)生幾個空的輸出文件part-r-000xx巍棱;
如果1<reduceTask的數(shù)量<getPartition的結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放蛋欣,會Exception航徙;
如果reduceTask的數(shù)量=1,則不管mapTask端輸出多少個分區(qū)文件陷虎,最終結(jié)果都交給這一個reduceTask到踏,最終也就只會產(chǎn)生一個結(jié)果文件 part-r-00000;
Partition分區(qū)案例
需求
將統(tǒng)計結(jié)果按照手機歸屬地不同省份輸出到不同文件中(分區(qū))
輸入數(shù)據(jù)
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網(wǎng)站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.# 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.# 站點統(tǒng)計 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統(tǒng)計 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
分析
- Mapreduce中會將map輸出的kv對泻红,按照相同key分組夭禽,然后分發(fā)給不同的reducetask。默認(rèn)的分發(fā)規(guī)則為:根據(jù)key的hashcode%reducetask數(shù)來分發(fā)
- 如果要按照我們自己的需求進行分組谊路,則需要改寫數(shù)據(jù)分發(fā)(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner - 在job驅(qū)動中讹躯,設(shè)置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
代碼實現(xiàn)
FlowBean類
public class FlowBean implements WritableComparable<FlowBean> {
private int up;
private int down;
private int total;
public FlowBean() {
}
public FlowBean(int up, int down) {
this.up = up;
this.down = down;
this.total = up + down;
}
public int getUp() {
return up;
}
public void setUp(int up) {
this.up = up;
}
public int getDown() {
return down;
}
public void setDown(int down) {
this.down = down;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
public void set(int up, int down) {
this.up = up;
this.down = down;
this.total = up + down;
}
@Override
public String toString() {
return this.up + "\t" + this.down + "\t" + this.total;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(up);
dataOutput.writeInt(down);
dataOutput.writeInt(total);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.up = dataInput.readInt();
this.down = dataInput.readInt();
this.total = dataInput.readInt();
}
}
Mapper類
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text ke = new Text();
FlowBean flowBean = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split("\t");
ke.set(strings[1]);
flowBean.set(Integer.parseInt(strings[8]), Integer.parseInt(strings[9]));
context.write(ke, flowBean);
}
}
Reducer類
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
int totalUp = 0;
int totalDown = 0;
for(FlowBean flowBean:values) {
totalUp += flowBean.getUp();
totalDown += flowBean.getDown();
}
context.write(key, new FlowBean(totalUp, totalDown));
}
}
分區(qū)類
public class FlowPartiton extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
if(text.toString().startsWith("135")){
return 0;
}else if(text.toString().startsWith("136")){
return 1;
}else if(text.toString().startsWith("137")){
return 2;
}else if(text.toString().startsWith("138")){
return 3;
}else{
return 4;
}
}
}
Driver類
public class UsePartitionFlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowDriver.class);
job.setPartitionerClass(FlowPartiton.class);
job.setNumReduceTasks(5);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
WritableComparable排序
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均會對數(shù)據(jù)(按照key)進行排序缠劝。該操作屬于Hadoop的默認(rèn)行為潮梯。任何應(yīng)用程序中的數(shù)據(jù)均會被排序,而不管邏輯上是否需要惨恭。默認(rèn)排序是按照字典順序排序秉馏,且實現(xiàn)該排序的方法是快速排序。
對于Map Task脱羡,它會將處理的結(jié)果暫時放到一個緩沖區(qū)中萝究,當(dāng)緩沖區(qū)使用率達到一定閾值后,再對緩沖區(qū)中的數(shù)據(jù)進行一次排序锉罐,并將這些有序數(shù)據(jù)寫到磁盤上帆竹,而當(dāng)數(shù)據(jù)處理完畢后茅逮,它會對磁盤上所有文件進行一次合并孝偎,以將這些文件合并成一個大的有序文件。
對于Reduce Task凡辱,它從每個Map Task上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件侨舆,如果文件大小超過一定閾值秒紧,則放到磁盤上,否則放到內(nèi)存中挨下。如果磁盤上文件數(shù)目達到一定閾值熔恢,則進行一次合并以生成一個更大文件;如果內(nèi)存中文件大小或者數(shù)目超過一定閾值复颈,則進行一次合并后將數(shù)據(jù)寫到磁盤上绩聘。當(dāng)所有數(shù)據(jù)拷貝完畢后沥割,Reduce Task統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進行一次合并。
排序的分類
- 部分排序
區(qū)內(nèi)排序凿菩,環(huán)形緩沖區(qū)机杜,MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內(nèi)部排序衅谷。 - 全排序
最終輸出結(jié)果只有一個文件椒拗,且文件內(nèi)部有序。實現(xiàn)方式是只設(shè)置一個ReduceTask获黔。但該方法在處理大型文件時效率極低蚀苛,因為一臺機器處理所有文件,完全喪失了MapReduce所提供的并行架構(gòu) - 輔助排序
GroupingComparator分組玷氏,在Reduce端對key進行分組堵未,應(yīng)用于,在接收key為bean對象時盏触,想讓一個或幾個字段相同(全部字段比較不相同)的key進入到同一個reduyce方法時渗蟹,可以采用輔助排序 - 二次排序
在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序赞辩。bean對象實現(xiàn)WritableComparable接口重寫compareTo方法雌芽,就可以實現(xiàn)自定義排序
WritableComparable排序案例
需求
對上面分區(qū)的案例上,對總流量進行排序
分析
- 把程序分兩步走辨嗽,第一步正常統(tǒng)計總流量世落,第二步再把結(jié)果進行排序
- context.write(總流量,手機號)
- FlowBean實現(xiàn)WritableComparable接口重寫compareTo方法
代碼實現(xiàn)
修改FlowBean類
public class FlowBean implements WritableComparable<FlowBean> {
private int up;
private int down;
private int total;
public FlowBean() {
}
public FlowBean(int up, int down) {
this.up = up;
this.down = down;
this.total = up + down;
}
public int getUp() {
return up;
}
public void setUp(int up) {
this.up = up;
}
public int getDown() {
return down;
}
public void setDown(int down) {
this.down = down;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
public void set(int up, int down) {
this.up = up;
this.down = down;
this.total = up + down;
}
@Override
public String toString() {
return this.up + "\t" + this.down + "\t" + this.total;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(up);
dataOutput.writeInt(down);
dataOutput.writeInt(total);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.up = dataInput.readInt();
this.down = dataInput.readInt();
this.total = dataInput.readInt();
}
@Override
public int compareTo(FlowBean o) {
if(total < o.getTotal()) {
return -1;
}else if(total > o.getTotal()) {
return 1;
}else {
return 0;
}
}
}
Combine合并
- combiner是MR程序中Mapper和Reducer之外的一種組件糟需。
- combiner組件的父類就是Reducer屉佳。
- combiner和reducer的區(qū)別在于運行的位置:
Combiner是在每一個maptask所在的節(jié)點運行;
Reducer是接收全局所有Mapper的輸出結(jié)果; - combiner的意義就是對每一個maptask的輸出進行局部匯總洲押,以減小網(wǎng)絡(luò)傳輸量忘古。
- combiner能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯,而且诅诱,combiner的輸出kv應(yīng)該跟reducer的輸入kv類型要對應(yīng)起來。
例如求平均值如果使用combine的話
mapper端
3 5 7 => (3 + 5 + 7) / 3 = 5
2 4 => (2 + 4) / 2 = 3
reduce端
[(5 + 3) / 2 = 4] != [(3 + 5 + 7 + 2 + 4) / 5 = 4.2] - 自定義Combiner實現(xiàn)步驟
1)自定義一個combiner繼承Reducer送朱,重寫reduce方法
2)在job驅(qū)動類中設(shè)置
Combine合并案例
需求
wordcount娘荡,統(tǒng)計過程中對每一個maptask的輸出進行局部匯總,以減小網(wǎng)絡(luò)傳輸量即采用Combiner功能
分析
方案一:
- 新增加一個WordCountCombine類繼承Reducer類
- 在WordCountCombine中驶沼, 統(tǒng)計單詞匯總炮沐,將統(tǒng)計結(jié)果輸出
方案二:
將WordCountReducer作為combine在WordCount驅(qū)動類中指定
代碼實現(xiàn)
WordCountReducer
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
WordCountMapper
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text ke = new Text();
private IntWritable valu = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split(" ");
for(String string : strings) {
ke.set(string);
context.write(ke, valu);
}
}
}
方案一實現(xiàn)
新增WordCountCombine
public class WordCountCombine extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
Driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setCombinerClass(WordCountCombine.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setNumReduceTasks(3);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
方案二實現(xiàn)
Driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setCombinerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setNumReduceTasks(3);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
結(jié)果
方案一和方案二運行結(jié)果如下
方案二比方案一簡單
GroupingComparator分組
輔助排序,對reduce階段的數(shù)據(jù)根據(jù)某一個或幾個字段進行分組回怜。
GroupingComparator分組案例
需求
統(tǒng)計每一個訂單中最貴的商品
輸入數(shù)據(jù)
0000001 Pdt_01 222.8
0000002 Pdt_06 722.4
0000001 Pdt_05 25.8
0000003 Pdt_01 222.8
0000003 Pdt_01 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
分析
- 利用“訂單id和成交金額”作為key大年,可以將map階段讀取到的所有訂單數(shù)據(jù)按照id分區(qū)换薄,按照金額排序,發(fā)送到reduce翔试。
- 在reduce端利用groupingcomparator將訂單id相同的kv聚合成組轻要,然后取第一個即是最大值
代碼實現(xiàn)
OrderBean
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private String productId;
private Double price;
public OrderBean() {
}
public void set(String orderId, String productId, Double price) {
this.orderId = orderId;
this.productId = productId;
this.price = price;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public int compareTo(OrderBean o) {
if(orderId.compareTo(o.getOrderId()) == 0) {
return -price.compareTo(o.getPrice());
}else {
return orderId.compareTo(o.getOrderId());
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.orderId);
out.writeUTF(this.productId);
out.writeDouble(this.price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.productId = in.readUTF();
this.price = in.readDouble();
}
@Override
public String toString() {
return "orderId='" + orderId + '\'' +
", productId='" + productId + '\'' +
", price=" + price;
}
}
OrderMapper
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
private OrderBean orderBean = new OrderBean();
private NullWritable val = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split("\t");
orderBean.set(strings[0], strings[1], Double.parseDouble(strings[2]));
context.write(orderBean, val);
}
}
OrderReducer
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for(NullWritable value: values) {
context.write(key, NullWritable.get());
}
}
}
OrderPartitioner
public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
OrderGrouping
public class OrderGrouping extends WritableComparator {
protected OrderGrouping() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean oa = (OrderBean) a;
OrderBean ob = (OrderBean) b;
return oa.getOrderId().compareTo(ob.getOrderId());
}
}
OrderDriver
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(OrderDriver.class);
job.setReducerClass(OrderReducer.class);
job.setMapperClass(OrderMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(OrderPartitioner.class);
job.setGroupingComparatorClass(OrderGrouping.class);
job.setNumReduceTasks(2);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean isSuccess = job.waitForCompletion(true);
}
}