前言
Reduce會(huì)從Mapper任務(wù)中拉取很多小文件,小文件內(nèi)部有序哲银,但是整體是沒序的,Reduce會(huì)合并小文件呻惕,然后套個(gè)歸并算法荆责,變成一個(gè)整體有序的文件。
Reducer 主要有3個(gè)基本的過程:
1.Shuffle階段
Reducer會(huì)通過網(wǎng)絡(luò)IO將Mapper端的排序輸出給復(fù)制過來亚脆。
2.Sort階段
- 按key對(duì)reducer輸入進(jìn)行排序(因?yàn)椴煌膍apper可能輸出相同的key)
- shuffle和sort階段同時(shí)發(fā)生做院,即在拉去mapper輸出時(shí),它們被合并濒持。
3.Reduce階段
在此階段中键耕,對(duì)排序輸入中的每個(gè)group調(diào)用reduce(object,iterable柑营,reducer.context)方法郁竟。reduce任務(wù)的輸出通常通過reducer.context.write(object棚亩,object)寫入記錄編寫器。reduce的輸出沒有重新排序勒虾。
源碼解析
1.Shuffle階段源碼分析
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
//發(fā)送task任務(wù)報(bào)告修然,與父進(jìn)程做交流
TaskReporter reporter = startReporter(umbilical);
//判斷用的是新的MapReduceAPI還是舊的API
boolean useNewApi = job.getUseNewReducer();
//核心代碼愕宋,初始化任務(wù)
initialize(job, getJobID(), reporter, useNewApi);
//Reduce任務(wù)有4種结榄,Job-setup Task, Job-cleanup Task, Task-cleanup Task和ReduceTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
//使用的shuffle插件
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//初始化shuffle插件角塑,核心代碼
shuffleConsumerPlugin.init(shuffleContext);
//跑shuflle核心代碼,此步驟蝙昙,會(huì)通過網(wǎng)絡(luò)IO將Map端的輸出給拉過來,并且進(jìn)行合并操作~~~
rIter = shuffleConsumerPlugin.run();
// free up the data structures
mapOutputFilesOnDisk.clear();
// sort is complete
sortPhase.complete();
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
//分組比較
RawComparator comparator = job.getOutputValueGroupingComparator();
//如果前面3個(gè)任務(wù)都不是蒜撮,執(zhí)行的就是最主要的ReduceTask,根據(jù)新老API調(diào)用不同的方法
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
首先是shuffle階段淀弹,Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程,通過HTTP方式請(qǐng)求MapTask所在的NodeManager以獲取輸出文件菌赖。
Reducer是如何知道要去哪些機(jī)器取數(shù)據(jù)呢琉用?
一旦map任務(wù)完成之后,就會(huì)通過常規(guī)心跳通知應(yīng)用程序的Application Master奴紧。reduce的一個(gè)線程會(huì)周期性地向master詢問黍氮,直到提取完所有數(shù)據(jù)沫浆。數(shù)據(jù)被reduce提走之后专执,map機(jī)器不會(huì)立刻刪除數(shù)據(jù)本股,這是為了預(yù)防reduce任務(wù)失敗需要重做。因此map輸出數(shù)據(jù)是在整個(gè)作業(yè)完成之后才被刪除掉的苟径。
NodeManager需要為分區(qū)文件運(yùn)行reduce任務(wù)涩笤。并且reduce任務(wù)需要集群上若干個(gè)map任務(wù)的map輸出作為其特殊的分區(qū)文件蹬碧。而每個(gè)map任務(wù)的完成時(shí)間可能不同恩沽,因此只要有一個(gè)任務(wù)完成罗心,reduce任務(wù)就開始復(fù)制其輸出城瞎。
reduce任務(wù)有少量復(fù)制線程脖镀,因此能夠并行取得map輸出蜒灰。默認(rèn)線程數(shù)為5,但這個(gè)默認(rèn)值可以通過mapreduce.reduce.shuffle.parallelcopies屬性進(jìn)行設(shè)置凸椿。
由于job的每一個(gè)map都會(huì)根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個(gè)partition脑漫,所以map的中間結(jié)果中是有可能包含每一個(gè)reduce需要處理的部分?jǐn)?shù)據(jù)的启昧。所以密末,為了優(yōu)化reduce的執(zhí)行時(shí)間严里,hadoop中是等job的第一個(gè)map結(jié)束后追城,所有的reduce就開始嘗試從完成的mapper中下載該reduce對(duì)應(yīng)的partition的部分?jǐn)?shù)據(jù)(shuffle)座柱,因此map和reduce是交叉進(jìn)行的色洞。
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
將Map端復(fù)制過來的數(shù)據(jù)先放入內(nèi)存緩沖區(qū)中
Merge有3種形式,分別是內(nèi)存到內(nèi)存奈搜,內(nèi)存到磁盤馋吗,磁盤到磁盤秋秤。默認(rèn)情況下第一種形式不啟用,第二種Merge方式一直在運(yùn)行(spill階段)直到結(jié)束堰怨,然后啟用第三種磁盤到磁盤的Merge方式生成最終的文件。(注意:為了合并灿巧,壓縮的map輸出都必須在內(nèi)存中被解壓縮)
一旦內(nèi)存緩沖區(qū)達(dá)到緩存溢出到磁盤的閾值時(shí)(默認(rèn)0.66)抠藕,或達(dá)到Map任務(wù)在緩存溢出前能夠保留在內(nèi)存中的輸出個(gè)數(shù)的閾值(默認(rèn)1000)盾似,則合并后溢出寫到磁盤中。如果指定combiner溉跃,則在合并期間運(yùn)行它以降低寫入硬盤的數(shù)據(jù)量撰茎。
隨著磁盤上副本的增多龄糊,后臺(tái)線程會(huì)將它們合并為更大的炫惩、排好序的文件诡必。這會(huì)為后面的合并節(jié)省一些時(shí)間爸舒。
復(fù)制完所有map輸出后扭勉,reduce任務(wù)進(jìn)入sort排序階段(更恰當(dāng)?shù)恼f法是merge合并階段涂炎,因?yàn)檫@個(gè)階段的主要工作是執(zhí)行了歸并排序)唱捣,循環(huán)進(jìn)行歸并排序(維持其順序排序)震缭。同時(shí)合并的文件流的數(shù)量由mapreduce.task.io.sort.factor屬性決定(默認(rèn)10)拣宰。
這里的merge和map端的merge動(dòng)作類似巡社,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中肥荔,然后當(dāng)使用內(nèi)存達(dá)到一定量的時(shí)候才spill磁盤次企。這里的緩沖區(qū)大小要比map端的更為靈活缸棵,它基于JVM的heap size設(shè)置堵第。
這個(gè)內(nèi)存大小的控制就不像map一樣可以通過io.sort.mb來設(shè)定了隧出,而是通過另外一個(gè)參數(shù) mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源碼里面寫死了) 來設(shè)置针余,這個(gè)參數(shù)其實(shí)是一個(gè)百分比凄诞,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task伪朽。JVM的heapsize的70%汛蝙。內(nèi)存到磁盤merge的啟動(dòng)門限可以通過mapreduce.reduce.shuffle.merge.percent(default0.66)配置烈涮。
也就是說,如果該reduce task的最大heap使用量(通常通過mapreduce.admin.reduce.child.java.opts來設(shè)置窖剑,比如設(shè)置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)坚洽。默認(rèn)情況下,reduce會(huì)使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)西土。假設(shè) mapreduce.reduce.shuffle.input.buffer.percent 為0.7讶舰,reducetask的max heapsize為1G,那么用來做下載數(shù)據(jù)緩存的內(nèi)存就為大概700MB左右。這700M的內(nèi)存绘雁,跟map端一樣,也不是要等到全部寫滿才會(huì)往磁盤刷的,而是當(dāng)這700M中被使用到了一定的限度(通常是一個(gè)百分比),就會(huì)開始往磁盤刷(刷磁盤前會(huì)先做sortMerge)杠娱。
這個(gè)限度閾值也是可以通過參數(shù) mapreduce.reduce.shuffle.merge.percent(default0.66)來設(shè)定刘离。與map 端類似恼除,這也是溢寫的過程,這個(gè)過程中如果你設(shè)置有Combiner,也是會(huì)啟用的堵幽,然后在磁盤中生成了眾多的溢寫文件殴胧。這種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束,然后啟動(dòng)磁盤到磁盤的merge方式生成最終的那個(gè)文件。
runNewReducer源碼解讀
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
//真正的迭代器
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
//構(gòu)建上下文的時(shí)候把迭代器傳進(jìn)來
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,//比較器
valueClass);
try {
//構(gòu)建完上下文之后運(yùn)行Redude的Run方法
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
首先這里會(huì)將茎匠,一些屬性封裝到reducerContext這個(gè)對(duì)象中汽馋,其中包括了最重要的rIter這個(gè)對(duì)象,這個(gè)迭代器對(duì)象復(fù)制真正的讀取數(shù)據(jù)的工作。之后調(diào)用的就是run方法,開始執(zhí)行setup方法以及我們自定義的reduce方法,下面先看看ReduceContextImpl中是如何完成迭代的习蓬!
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
//把迭代器傳給輸入對(duì)象Input
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
/** Start processing next unique key. */
//實(shí)際上Reduce中run方法中的contect.netKey調(diào)用的邏輯
public boolean nextKey() throws IOException,InterruptedException {
///第一次假 放空
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}
/**
* Advance to the next key/value pair.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!hasMore) {
key = null;
value = null;
return false;
}
firstValue = !nextKeyIsSame;
DataInputBuffer nextKey = input.getKey();
currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition());
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
key = keyDeserializer.deserialize(key);
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
currentValueLength = nextVal.getLength() - nextVal.getPosition();
if (isMarked) {
backupStore.write(nextKey, nextVal);
}
hasMore = input.next();
if (hasMore) {
nextKey = input.getKey();
//判斷當(dāng)前key和下一個(gè)Key是否相等罗丰。
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}
public KEYIN getCurrentKey() {
return key;
}
@Override
public VALUEIN getCurrentValue() {
return value;
}
接下來是Reduce中的run的方法
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
//實(shí)際上在這一步里實(shí)際上調(diào)用了NextKeyValue的值更新了 hasmore栖疑,nextKeyisSame,Key锻霎,Value的值
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
nextKey()會(huì)調(diào)用真的迭代器的方法奄容,會(huì)對(duì)nextKeyIsSame進(jìn)行判斷,還對(duì)hasMore進(jìn)行判斷。
getValues()會(huì)返回一個(gè)痴荐,可迭代的對(duì)象,ValueIterable類型的员淫。ValueIterable中iterator又會(huì)返回一個(gè)ValueIterator迭代器對(duì)象拴事,下面看了看ValueIterator的源碼徘公。該類是ReduceContextImpl的內(nèi)部類关面。
protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {
private boolean inReset = false;
private boolean clearMarkFlag = false;
@Override
public boolean hasNext() {
try {
if (inReset && backupStore.hasNext()) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("hasNext failed", e);
}
return firstValue || nextKeyIsSame;
}
@Override
public VALUEIN next() {
if (inReset) {
try {
if (backupStore.hasNext()) {
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {
inReset = false;
backupStore.exitResetMode();
if (clearMarkFlag) {
clearMarkFlag = false;
isMarked = false;
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("next value iterator failed", e);
}
}
// if this is the first record, we don't need to advance
if (firstValue) {
firstValue = false;
return value;
}
// if this isn't the first record and the next key is different, they
// can't advance it here.
if (!nextKeyIsSame) {
throw new NoSuchElementException("iterate past last value");
}
// otherwise, go to the next key/value pair
try {
//這個(gè)迭代器自身是沒有數(shù)據(jù)的内边,在Next中調(diào)用的還是 nextKeyValue,在這個(gè)NextKeyValue中調(diào)用的是Input的輸入數(shù)據(jù)
nextKeyValue();
return value;
} catch (IOException ie) {
throw new RuntimeException("next value iterator failed", ie);
} catch (InterruptedException ie) {
// this is bad, but we can't modify the exception list of java.util
throw new RuntimeException("next value iterator interrupted", ie);
}
}
可以看到這個(gè)迭代器自身對(duì)數(shù)據(jù)的迭代是通過之前的真實(shí)迭代器對(duì)象rIter來完成的竿音,在Next中調(diào)用的還是rIter的 nextKeyValue方法萄涯,在這個(gè)NextKeyValue中調(diào)用的是Input的輸入數(shù)據(jù)燃乍。
真正迭代器中有一個(gè)重要的標(biāo)識(shí)NextKeyisSame,這個(gè)標(biāo)識(shí)會(huì)被hasNext方法用到然后判斷下一個(gè)key是否 相同蛮浑,直到一組數(shù)據(jù)唠叛。
nextKeyValue該方法會(huì)對(duì)key和value進(jìn)行賦值,同時(shí)調(diào)用hasmore對(duì)nextKeyIsSame進(jìn)行判定是否是true沮稚,之后調(diào)用分組比較器艺沼,返回0則為true。
這里需要注意蕴掏,自定義的reduce方法障般,如果迭代了value调鲸,每個(gè)value對(duì)應(yīng)的key也是會(huì)隨之迭代的。因?yàn)閗ey這塊是按引用傳遞剩拢。會(huì)改變同一塊內(nèi)存中的數(shù)據(jù)线得。也就是說通過值的迭代,也迭代key徐伐。
上面還需要注意的地方是贯钩。
如果用戶設(shè)置了比較器會(huì)用用戶自定義的分組比較器,如果用戶沒設(shè)置就用排序比較器當(dāng)做分組比較器办素,否則用默認(rèn)的key自帶的比較器角雷。