MapReduce執(zhí)行流程

數(shù)據(jù)處理總流程

MapReduce計算框架體現(xiàn)的是一個分治的思想。及將待處理的數(shù)據(jù)分片在每個數(shù)據(jù)分片上并行運行相同邏輯的map()函數(shù)惫皱,然后將每一個數(shù)據(jù)分片的處理結(jié)果匯集到reduce()函數(shù)進(jìn)行規(guī)約整理难礼,最后輸出結(jié)果彭沼。


file

總體上來說MapReduce的處理流程從邏輯上看并不復(fù)雜廉侧。對于應(yīng)用Hadoop進(jìn)行數(shù)據(jù)分析的開發(fā)人員來說肖方,只需實現(xiàn)map()方法和reduce()方法就能完成大部分的工作。正是因為Hadoop邏輯上和開發(fā)上都不復(fù)雜使它被廣泛的應(yīng)用于各行各業(yè)非洲。

Map階段

Map階段更為詳細(xì)的處理過程如圖所示:


file

一般情況下用戶需要處理分析的數(shù)據(jù)都在HDFS上鸭限。因此,MapReduce計算框架會是使用InputFormat(org.apache.hadoop.mapreduce)的子類將輸入數(shù)據(jù)分片(InputSplit)两踏。分片后的數(shù)據(jù)將作為MapTask的輸入里覆,MapTask會根據(jù)map()中的程序邏輯將數(shù)據(jù)分為K-V鍵值對。
為了更好的理解數(shù)據(jù)分片的過程和實現(xiàn)的邏輯缆瓣,本文以InputFormat的一個子類FileInputFormat為例研究數(shù)據(jù)分片的過程。
FileInputFormat類將數(shù)據(jù)分片虹统,然而這里所說的分片并不是將數(shù)據(jù)物理上分成多個數(shù)據(jù)塊而是邏輯分片弓坞。
PS:并不是所有文件都可以分片,比如gzip车荔,snappy壓縮的文件就無法分割 .
數(shù)據(jù)邏輯分片的核心方法是getSplits():

 public List<InputSplit> getSplits(JobContext job) throws IOException { 
    渡冻。。忧便。族吻。。。 
    List<InputSplit> splits = new ArrayList<InputSplit>(); 
    List<FileStatus> files = listStatus(job); 
    for (FileStatus file: files) { 
      Path path = file.getPath(); 
      long length = file.getLen(); 
      if (length != 0) { 
        BlockLocation[] blkLocations; 
        if (file instanceof LocatedFileStatus) { 
          blkLocations = ((LocatedFileStatus) file).getBlockLocations(); 
        } else { 
          FileSystem fs = path.getFileSystem(job.getConfiguration()); 
          blkLocations = fs.getFileBlockLocations(file, 0, length); 
        } 
        if (isSplitable(job, path)) { 
          long blockSize = file.getBlockSize(); 
          long splitSize = computeSplitSize(blockSize, minSize, maxSize); 
 
          long bytesRemaining = length; 
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 
            splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
                        blkLocations[blkIndex].getHosts(), 
                        blkLocations[blkIndex].getCachedHosts())); 
            bytesRemaining -= splitSize; 
          }           
          if (bytesRemaining != 0) { 
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
                       blkLocations[blkIndex].getHosts(), 
                       blkLocations[blkIndex].getCachedHosts())); 
          } 
        } else { // not splitable 
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), 
                      blkLocations[0].getCachedHosts())); 
        } 
      } else {  
        //Create empty hosts array for zero length files 
        splits.add(makeSplit(path, 0, length, new String[0])); 
      } 
    } 
    超歌。砍艾。。巍举。脆荷。。 
    return splits; 
  } 

其流程圖如下所示:


file

getSplits()中的BlockLocation類保存待處理文件的數(shù)據(jù)塊信息懊悯,它包含了數(shù)據(jù)塊所在DataNode的hostname蜓谋,帶有緩存副本的數(shù)據(jù)塊所在的節(jié)點的hostname,訪問數(shù)據(jù)塊所在DataNode的IP:端口號炭分,在拓?fù)渚W(wǎng)絡(luò)中的絕對路徑名桃焕,數(shù)據(jù)塊在整個數(shù)據(jù)文件中的偏移量,數(shù)據(jù)塊長度捧毛,是否是壞塊观堂。getSplits()會依據(jù)這些信息創(chuàng)建一個FileSplit完成一個邏輯分片,然后將所有的邏輯分片信息保存到List中岖妄。List中的InputSplit包含四個內(nèi)容型将,文件的路徑,文件開始的位置荐虐,文件結(jié)束的位置七兜,數(shù)據(jù)塊所在的host。
除了getSplits()方法另一比較重要的算法是computeSplitSize()方法福扬,它負(fù)責(zé)確定數(shù)據(jù)分片的大小腕铸,數(shù)據(jù)分片的大小對程序的性能會有一定的影響,最好將數(shù)據(jù)分片的大小設(shè)置的和HDFS中數(shù)據(jù)分片的大小一致铛碑。確定分片大小的算法是:

Math.max(minSize, Math.min(maxSize, blockSize)) 
set mapred.max.split.size=256000000;2.x版本默認(rèn)約是128M狠裹,我們集群配置的是256M 
set mapred.min.split.size=10000000;2.x版本默認(rèn)是約10M,我們集群配置的是1 
blockSize 在hdfs-site.xml參數(shù)dfs.block.size中配置汽烦,我們集群設(shè)置的是默認(rèn)的是134217728=128M 

set mapred.map.tasks 對map task數(shù)量僅僅是參考的作用涛菠,我們集群默認(rèn)的是2 
對應(yīng)的是set mapred.reduce.tasks,我們集群默認(rèn)的是-1 
reducer數(shù)量可能起作用的 
hive.exec.reducers.bytes.per.reducer=256000000 
hive.exec.reducers.max=1009 
min( hive.exec.reducers.max 撇吞,總輸入數(shù)據(jù)量/hive.exec.reducers.bytes.per.reducer) 

其中俗冻,minSize是配置文件中設(shè)置的分片最小值,minSize則為最大值牍颈,blockSize為HDFS中數(shù)據(jù)塊的大小迄薄。
完成邏輯分片后,F(xiàn)ileInputFormat的各個子類向MapTask映射k-v鍵值對(如TextInputFormat)煮岁。FileInputFormat的子類是對數(shù)據(jù)分片中的數(shù)據(jù)進(jìn)行處理讥蔽。

file

TextInputFormat中createRecorderReader()將InputSplit解析為k-v傳給mapTask涣易,該方法中用到了LineRecordReader它繼承自RecordReader。
file

MapTask最終是通過調(diào)用nextKeyValue()方法來遍歷分片中的數(shù)據(jù)并且將行數(shù)以及每一行的的數(shù)據(jù)分別作為key和value傳遞給map()方法冶伞。map()方法按照開發(fā)工程師編寫的邏輯對輸入的key和value進(jìn)行處理后會組成新的k-v對然后寫出到一個內(nèi)存緩沖區(qū)中新症。
每個MapTask都有一個內(nèi)存緩沖區(qū),對緩沖區(qū)讀寫是典型的生產(chǎn)者消費者模式碰缔。這里內(nèi)存緩沖區(qū)的結(jié)構(gòu)設(shè)計對MapTask的IO效率有著直接的影響账劲。Hadoop采用了環(huán)形內(nèi)存緩沖區(qū),當(dāng)緩沖區(qū)數(shù)據(jù)量達(dá)到閾值消費者線程SpillThread開始將數(shù)據(jù)寫出金抡,于此同時充當(dāng)生產(chǎn)者的writer()函數(shù)依然可以將處理完的數(shù)據(jù)寫入到緩沖區(qū)中瀑焦。生產(chǎn)者和消費者之間的同步是通過可重入互斥鎖spillLock來完成的。
在寫磁盤之前梗肝,線程會對緩沖區(qū)內(nèi)的數(shù)據(jù)進(jìn)行分區(qū)榛瓮,以決定各個數(shù)據(jù)會傳輸?shù)侥膫€Reduce中。而在每個分區(qū)中會按key進(jìn)行排序(如果此時有個Combiner則它會在排序后的輸出上運行一次巫击,以壓縮傳輸?shù)臄?shù)據(jù))

mapred-site.xml 文件中 
mapreduce.task.io .sort.mb=300M 
mapreduce.map.sort.spill.percent 配置的默認(rèn)只0.8 
file

用戶可以通過繼承Partitiner類并且實現(xiàn)getPartitioner()方法禀晓,從而定制自己的分區(qū)規(guī)則。默認(rèn)的分區(qū)規(guī)則是通過key的hashCode來完成分區(qū)的坝锰。
環(huán)形緩沖區(qū)在達(dá)到溢寫的閾值后粹懒,溢寫到磁盤(每次溢寫都會新建一個溢寫文件)最后合并溢寫文件,形成一個分區(qū)有序的中間結(jié)果顷级。另外可以對中間結(jié)果進(jìn)行壓縮凫乖,以減少傳輸?shù)臄?shù)據(jù)量。

Reduce階段

Reduce階段更為詳細(xì)的流程如下圖所示:


file

ReduceTask對數(shù)據(jù)進(jìn)行規(guī)約的第一步就是從MapTask的輸出磁盤上將數(shù)據(jù)拉取過來弓颈。這個過程重點分析shuffle類和Fetcher類帽芽。Shuffle類如下圖所示:


file

Shuffle類中的init()方法負(fù)責(zé)初始化Shuffle階段需要的上下文,并且在Shuffle的最后階段調(diào)用歸并排序方法翔冀。Shuffle類的核心方法為run()方法导街。
public RawKeyValueIterator run() throws IOException, InterruptedException {
    。纤子。搬瑰。。控硼。跌捆。 
    // Start the map-output fetcher threads 
    Boolean isLocal = localMapFiles != null;
    final int numFetchers = isLocal ? 1 : 
    jobConf.getint(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
    Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
    if (isLocal) {
        fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler, 
        merger, reporter, metrics, this, reduceTask.getShuffleSecret(), 
        localMapFiles);
        fetchers[0].start();
    } else {
        for (int i=0; i < numFetchers; ++i) {
            fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,  
            reporter, metrics, this,reduceTask.getShuffleSecret());
            fetchers[i].start();
        }
    }
    。象颖。。姆钉。说订。抄瓦。 
    eventFetcher.shutDown();
    for (Fetcher<K,V> fetcher : fetchers) {
        fetcher.shutDown();
    }
    scheduler.close();
    copyPhase.complete();
    // copy is already complete 
    taskStatus.setPhase(TaskStatus.Phase.SORT);
    reduceTask.statusUpdate(umbilical);
    RawKeyValueIterator kvIter = null;
    。陶冷。钙姊。。埂伦。煞额。 
    return kvIter;
}

在run()方法中它是通過啟動fetcher線程來拉取數(shù)據(jù)的。首先需要判斷將要拉取的數(shù)據(jù)是否具有本地性沾谜,如果數(shù)據(jù)在本地則直接傳入文件的地址否則創(chuàng)建fetcher線程來從其他節(jié)點遠(yuǎn)程拉取數(shù)據(jù)膊毁。Fetcher類類圖如下:


file

Fetcher繼承自Thread類因此它重寫了run()方法并且調(diào)用了copyFromHost()方法。copyFromHost()方法首先獲取指定host上運行完成的MapTaskID然后循環(huán)的從Map段讀取數(shù)據(jù)直到所有的數(shù)據(jù)都讀取完成基跑。

 protected void copyFromHost(MapHost host) throws IOException {
    婚温。。媳否。栅螟。。篱竭。 
    List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
    力图。。掺逼。吃媒。。坪圾。 
    while (!remaining.isEmpty() && failedTasks == null) {
        try {
            failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
        }
        catch (IOException e) {
            晓折。。兽泄。漓概。。
        }
    }
}

讀取數(shù)據(jù)是在copyMapOutput()方法中完成的病梢,方法中用到了ShufferHeader類它實現(xiàn)了Writable接口從而可以完成序列化與反序列化的工作胃珍,它調(diào)用readFields()方法從數(shù)據(jù)流中讀取數(shù)據(jù)。


file
mapreduce.task.io .sort.factor =25 

讀取數(shù)據(jù)過程中需要注意的是蜓陌,如果中間結(jié)果小則復(fù)制到內(nèi)存緩沖區(qū)中否則復(fù)制到本地磁盤中觅彰。當(dāng)內(nèi)存緩沖區(qū)達(dá)到大小閾值或者文件數(shù)閾值則溢寫到本地磁盤,與此同時后臺線程會不停的合并溢寫文件形成大的有序的文件钮热。
在Shuffle-copy階段進(jìn)行的同時Shuffle-Sort也在處理數(shù)據(jù)填抬,這個階段就是針對內(nèi)存中的數(shù)據(jù)和磁盤上的數(shù)據(jù)進(jìn)行歸并排序。
復(fù)制完所有的map輸出做循環(huán)歸并排序合并數(shù)據(jù)隧期。舉個例子更加好理解飒责,若合并因子為10赘娄,50個輸出文件,則合并5次宏蛉,最后剩下5個文件不符合合并條件遣臼,則將這5個文件交給Reduce處理。
Reduce階段會接收到已經(jīng)排完序的k-v對拾并,然后對k-v對進(jìn)行邏輯處理最后輸出結(jié)果k-v對到HDFS中.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末揍堰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子嗅义,更是在濱河造成了極大的恐慌屏歹,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件芥喇,死亡現(xiàn)場離奇詭異西采,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)继控,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進(jìn)店門械馆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人武通,你說我怎么就攤上這事霹崎。” “怎么了冶忱?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵尾菇,是天一觀的道長。 經(jīng)常有香客問我囚枪,道長派诬,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任链沼,我火速辦了婚禮默赂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘括勺。我一直安慰自己缆八,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布疾捍。 她就那樣靜靜地躺著奈辰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪乱豆。 梳的紋絲不亂的頭發(fā)上奖恰,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼房官。 笑死趾徽,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的翰守。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼疲酌,長吁一口氣:“原來是場噩夢啊……” “哼蜡峰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起朗恳,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤湿颅,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后粥诫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體油航,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年怀浆,在試婚紗的時候發(fā)現(xiàn)自己被綠了谊囚。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,992評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡执赡,死狀恐怖镰踏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情沙合,我是刑警寧澤奠伪,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站首懈,受9級特大地震影響绊率,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜究履,卻給世界環(huán)境...
    茶點故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一滤否、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧挎袜,春花似錦顽聂、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至全景,卻和暖如春耀石,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背爸黄。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工滞伟, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留揭鳞,地道東北人。 一個月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓梆奈,卻偏偏與公主長得像野崇,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子亩钟,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容