MapReduce-深度解析

MR Workflow

圖片摘自于Alexey Grishchenko博文

Map-side

InputFormat Class :

  • getSplits: the set of input data splits 返回一組輸入數(shù)據(jù)的拆分文件

  • getRecordReader: iterable interface for reading all the records from a single input

    ? 從單個輸入文件提供可迭代的接口用于讀取輸入數(shù)據(jù)

文件塊大小取決于InputFormat(自定義輸入文件拆分需要繼承此FileInputFormat)

  • 輸入文件是文本類型需要配置dfs.blocksize的大小(hdfs-site.xml)

  • gzip類型的壓縮文件挺物,不可拆分需要用輸入整個文件等勒极。

每個mapper處理一個輸入拆分塊文件晃择,大多數(shù)時處理128MB大小文件若輸入文件是以GB或PB或更大磨确。

map 函數(shù)運用于輸入拆分文件的每對鍵值對(k酿愧,v)挺狰,它們的每對鍵值對都有RecordReader 返回身隐。

根據(jù)業(yè)務(wù)邏輯需求廷区,在mapper對每對鍵值做處理輸出結(jié)果,將結(jié)果由Context文本類傳給reducer端贾铝。

負責收集map的輸出數(shù)據(jù)(如文件)是mapreduce.job.map.output.collector 屬性 (mapreduce-default.xml)隙轻,默認是由org.apache.hadoop.mapred.MapTask$MapOutputBuffer 實現(xiàn)。

Map 函數(shù)的輸出結(jié)果首先調(diào)用從PartitionergetPartition方法垢揩。 getPartition需要傳入鍵值對(k玖绿,v) 還有reduce的任務(wù)數(shù)量(numReduceTasks),然后返回這些鍵值對匹配的分區(qū)水孩。

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

接著把輸出數(shù)據(jù)鍵值對和分區(qū)號一同寫入環(huán)形緩沖區(qū) (ring buffer), 此緩沖區(qū)大小由mapreduce.task.io.sort.mb (mapreduce-default.xml) 定義镰矿, 默認為100MB, 最大map輸出數(shù)據(jù)信息允許在ring buffer占用的大小. 若輸出數(shù)據(jù)大小大于此值俘种,數(shù)據(jù)會被溢出到(寫入)硬盤秤标。

注意绝淡,map 輸出的環(huán)形緩沖區(qū)默認是比輸入文件的拆分塊(默認是128MB) 要小,所以多余的會被寫到硬盤中苍姜。

溢出的操作是由新的線程執(zhí)行牢酵,起初環(huán)形緩沖區(qū)的大小為0.8mapreduce.map.sort.spill.percent (mapreduce-default.xml), 所以最初的緩沖區(qū)大小為80MB衙猪。 map task的輸出溢出文件大小默認是大于80MB的馍乙,多余的是會被寫到硬盤里。

溢出用開啟新線程來處理輸出文件數(shù)據(jù)是為了讓mapper在處理溢出同時也能繼續(xù)執(zhí)行處理輸入文件數(shù)據(jù)垫释。

注意: 當處理輸入數(shù)據(jù)(FileInputFormat)的數(shù)率比溢出的數(shù)率快時丝格,Mapper函數(shù)會停止工作 因為 內(nèi)存的環(huán)形緩沖區(qū)可能會達到100%。在這種情況棵譬,mapper函數(shù)會阻塞并等待溢出線程為下一批輸出數(shù)據(jù)處理清空一些內(nèi)存空間显蝌。

溢出線程會把環(huán)形緩沖區(qū)數(shù)據(jù)寫到mapper函數(shù)調(diào)用的服務(wù)器的本地的文件里。溢出線程寫出本地的路勁是由mapreduce.job.local.dirmapred-default.xml)定義订咸,此配置屬性包含了一組由集群上MapReduce任務(wù)用到的路徑來存儲零時數(shù)據(jù)曼尊。 文件夾被一個接一個使用。 寫入前脏嚷,數(shù)據(jù)會以快速排序 進行排序:comparator函數(shù)先對比partition分區(qū)號然后再對比key值骆撇,以至于先排分區(qū),再每個分區(qū)排序key值父叙。

排序完成后神郊,Combiner被調(diào)用用于減少輸入硬盤的數(shù)據(jù)量。Combiner的輸出會被寫入硬盤高每。 有個邊際情況是當mapper產(chǎn)生的輸出數(shù)據(jù)過于大不能容入內(nèi)存時(大于輸出緩沖區(qū)大杏炱瘛),sorter和combiner時不會被調(diào)用鲸匿; 這時爷怀,mapper的輸出數(shù)據(jù)就會直接寫入硬盤。

無論mapper輸出數(shù)據(jù)是多大带欢,輸出完成時“Spill”是肯定會至少調(diào)用一次运授。

(詳細信息查看:ShuffleHandler.class#sendMapOutput

protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, ? String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)

同樣的map任務(wù)會執(zhí)行,如sort和combine乔煞,還有寫入本地磁盤文件吁朦。

每個由溢出線程溢出文件有個索引包含了每個溢出文件的分區(qū)信息:分區(qū)從哪開始于哪結(jié)束。 這些索引被存在內(nèi)存中渡贾,此內(nèi)存塊大小由mapreduce.task.index.cache.limit.bytes決定逗宜,默認為1MB。 內(nèi)存不足時,所有的下一批生成的溢出文件的索引會與溢出文件一起被寫入到磁盤纺讲。

當mapper處理輸出文件與最后溢出結(jié)束時擂仍,溢出線程完成結(jié)束而合并階段開始。 在合并時熬甚,所有的溢出文件應(yīng)該合并一塊為單個map的輸出文件逢渔。一個合并過程默認可以處理10個溢出文件(由mapreduce.task.io.sort.factor決定)。

<property>
  <name>mapreduce.task.io.sort.factor</name>
  <value>10</value>
  <description>The number of streams to merge at once while sorting
  files.  This determines the number of open file handles.</description>
</property>

若溢出文件大于此屬性值乡括, 剩余的文件會被合并為一個大文件肃廓。

合并期間,若被合并的文件大于(min.num.spills.for.combine , 默認為3), 寫入磁盤前诲泌,combiner會在merge結(jié)果上執(zhí)行盲赊。

MapTask的結(jié)果是一個包含了所有的Mapper輸出結(jié)果與描述分區(qū)開始-結(jié)束的索引信息的輸出文件, 這些分區(qū)開始-結(jié)束索引信息便于ReduceTask能夠從磁盤中索取每個reducer任務(wù)的運行相關(guān)數(shù)據(jù)档礁。


Reduce-Side

Map的任務(wù)數(shù)量由拆分塊決定的角钩,然而reduce的任務(wù)用戶自己設(shè)置的(mapreduce.job.reduces, 默認為1)呻澜。 Reduce端的shuffle的實現(xiàn)由mapreduce.job.reduce.shuffle.consumer.plugin.class 屬性決定,默認為org.apache.hadoop.mapreduce.task.reduce.Shuffle惨险。

Reduce端做的第一件事就開啟”Event Fetcher" 線程羹幸,從Application Master得到Mapper的狀態(tài)并監(jiān)聽mapper的事件是否執(zhí)行結(jié)束。 當mapper結(jié)束自己的shuffle過程辫愉,mapper的輸出文件數(shù)據(jù)傳送到多個“Fetcher”線程的其中一個栅受。 “Fetcher”線程是由mapreduce.reduce.shuffle.parallelcopies 決定的,默認為5個恭朗,這意味著單個reduce任務(wù), 有五個線程來從mapper端并行拷貝數(shù)據(jù)屏镊。Fetch的節(jié)點間的傳輸是通過HTTP或HTTPS的協(xié)議連接fetcher到相應(yīng)的DataNode URL。

<property>
  <name>mapreduce.reduce.shuffle.parallelcopies</name>
  <value>5</value>
  <description>The default number of parallel transfers run by reduce
  during the copy(shuffle) phase.
  </description>
</property>

“Fetcher”從Mapper端下載的數(shù)據(jù)都被存在內(nèi)存中痰腮,此內(nèi)存大小占用reducer內(nèi)存比例由mapreduce.reduce.shuffle.input.buffer.percent決定而芥,總的reducer內(nèi)存是mapreduce.reduce.memory.totalbytes

<property>
  <name>mapreduce.reduce.shuffle.merge.percent</name>
  <value>0.66</value>
  <description>The usage threshold at which an in-memory merge will be
  initiated, expressed as a percentage of the total memory allocated to
  storing in-memory map outputs, as defined by
  mapreduce.reduce.shuffle.input.buffer.percent.
  </description>
</property>

<property>
  <name>mapreduce.reduce.memory.mb</name>
  <value>1024</value>
  <description>The amount of memory to request from the scheduler for each
  reduce task.
  </description>
</property>

若這些內(nèi)存不夠容納膀值,reducer會把數(shù)據(jù)存入reducer端的本地磁盤中的mapreduce.job.local.dir文件夾棍丐。

<property>
  <name>mapreduce.cluster.local.dir</name>
  <value>${hadoop.tmp.dir}/mapred/local</value>
  <description>The local directory where MapReduce stores intermediate
  data files.  May be a comma-separated list of
  directories on different devices in order to spread disk i/o.
  Directories that do not exist are ignored.
  </description>
</property>

fetcher索取相應(yīng)文件數(shù)據(jù)后,merger線程開啟工作沧踏。它們不會等待整個fetching過程完成而是開新線程與其并行執(zhí)行歌逢。 Hadoop有三種merger線程。

  1. InMemory merger (內(nèi)存線程)
    1. 不能關(guān)閉翘狱。 由Reduce任務(wù)索取MapTask的輸出數(shù)據(jù)占用的Reducer內(nèi)存緩存超出了總內(nèi)存允許的占用百分比reduce.shuffle.merge.percent而啟動秘案。 合并后執(zhí)行combiner。輸出寫入硬盤,總會被調(diào)用至少一次阱高。
  2. MemToMem merger (內(nèi)存到內(nèi)存)
    1. 默認為關(guān)閉师骗。 可由reduce.merge.memtomem.enabled 開啟。 此線程合并內(nèi)存中mapper的輸出文件數(shù)據(jù)并寫reduce輸出到內(nèi)存讨惩。當不同的MapTask輸出文件大小達到mapreduce.reduce.merge.memtomem.threshold (默認為1000)辟癌, 線程會被啟動。
  3. OnDisk (硬盤)
    1. 在一次線程執(zhí)行中荐捻, 當文件數(shù)量以(2 * task.io.sort.factor - 1) 上升 , 但是合并不超過mapreduce.task.io.sort.factor 的文件數(shù)量而啟動黍少。 OnDisk Merger線程合并本地磁盤的文件。

最后個線程处面, finalMerge 在reducer的主線程中運行厂置,合并所有由InMemory和OnDisk在本地磁盤產(chǎn)生剩余的文件。 最終合并輸出結(jié)果分布在RAM和硬盤之間魂角。RAM最大允許使用為reduce 輸入大小是由總的棧大小mapred.job.reduce.markreset.buffer.percent百分比來決定的昵济,默認為0.

在這些所有的線程啟動完成后,reducer會把輸出寫入HDFS文件系統(tǒng)中野揪。


原文出自于访忿,Alexey Grishchenko的Hadoop MapReduce Comprehensive Description

譯者: 邁大_阿李同學

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市斯稳,隨后出現(xiàn)的幾起案子海铆,更是在濱河造成了極大的恐慌,老刑警劉巖挣惰,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件卧斟,死亡現(xiàn)場離奇詭異,居然都是意外死亡憎茂,警方通過查閱死者的電腦和手機珍语,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來竖幔,“玉大人板乙,你說我怎么就攤上這事∩兔叮” “怎么了亡驰?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長饿幅。 經(jīng)常有香客問我凡辱,道長,這世上最難降的妖魔是什么栗恩? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任透乾,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘乳乌。我一直安慰自己捧韵,他們只是感情好,可當我...
    茶點故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布汉操。 她就那樣靜靜地躺著再来,像睡著了一般。 火紅的嫁衣襯著肌膚如雪磷瘤。 梳的紋絲不亂的頭發(fā)上芒篷,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天,我揣著相機與錄音采缚,去河邊找鬼针炉。 笑死,一個胖子當著我的面吹牛扳抽,可吹牛的內(nèi)容都是我干的篡帕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼贸呢,長吁一口氣:“原來是場噩夢啊……” “哼镰烧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起贮尉,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤拌滋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后猜谚,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡赌渣,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年魏铅,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坚芜。...
    茶點故事閱讀 39,992評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡览芳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鸿竖,到底是詐尸還是另有隱情沧竟,我是刑警寧澤,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布缚忧,位于F島的核電站悟泵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏闪水。R本人自食惡果不足惜糕非,卻給世界環(huán)境...
    茶點故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧朽肥,春花似錦禁筏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至始腾,卻和暖如春州刽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背窘茁。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工怀伦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人山林。 一個月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓房待,卻偏偏與公主長得像,于是被迫代替她去往敵國和親驼抹。 傳聞我的和親對象是個殘疾皇子桑孩,可洞房花燭夜當晚...
    茶點故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容