Hadoop Map/Reduce執(zhí)行流程詳解

一個Map/Reduce 作業(yè)(job) 通常會把輸入的數(shù)據(jù)(input file)切分為若干獨立的數(shù)據(jù)塊(splits)嫩舟,然后由 map任務(task)以完全并行的方式處理它們。Map/Reduce框架會對map的輸出做一個 Shuffle 操作,Shuffle 操作的后的結(jié)果會輸入給reduce任務肛走。整個Map/Reduce框架負責任務的調(diào)度和監(jiān)控焚志,以及重新執(zhí)行已經(jīng)失敗的任務。

請點擊此處輸入圖片描述

Map/Reduce計算集群由一個單獨的JobTracker(master) 和每個集群節(jié)點一個 TaskTracker(slave)共同組成胆筒。JobTracker負責調(diào)度構(gòu)成一個作業(yè)的所有任務邮破,這些任務會被分派到不同的TaskTracker上去執(zhí)行,JobTracker會監(jiān)控它們的執(zhí)行仆救、重新執(zhí)行已經(jīng)失敗的任務抒和。而TaskTracker僅負責執(zhí)行由JobTracker指派的任務。

請點擊此處輸入圖片描述

本文將按照map/reduce執(zhí)行流程中各個任務的時間順序詳細敘述map/reduce的各個任務模塊彤蔽,包括:輸入分片(input split)摧莽、map階段、combiner階段顿痪、shuffle階段和reduce階段镊辕。下圖是一個不錯的執(zhí)行流程圖:

請點擊此處輸入圖片描述

作業(yè)的提交與監(jiān)控

JobClient是用戶提交的作業(yè)與JobTracker交互的主要接口。

請點擊此處輸入圖片描述

JobClient提交作業(yè)的過程如下:

(1) map/reduce程序通過runJob()方法新建一個JobClient實例;

(2) 向JobTracker請求一個新jobID蚁袭,通過JobTracker的getNewJobId()獲日餍浮;

(3) 檢查作業(yè)輸入輸出說明撕阎。如果沒有指定輸出目錄或者輸出目錄已經(jīng)存在受裹,作業(yè)將不會被提交,map/reduce程序虏束; 輸入作業(yè)劃分split棉饶,如果劃分無法計算(如:輸入路徑不存在),作業(yè)將不會被提交镇匀,錯誤返回給map/reduce程序照藻。

(4) 將運行作業(yè)所需要的資源(作業(yè)的jar文件、配置文件汗侵、計算所得的輸入劃分)復制到一個以作業(yè)ID命名的目錄中幸缕;

(5) 通過調(diào)用JobTracker的submitJob()方法群发,告訴JobTracker作業(yè)準備提交;

(6) JobTracker將提交的作業(yè)放到一個內(nèi)部隊列中发乔,交由作業(yè)調(diào)度器進行調(diào)度熟妓,并對其進行初始化。

(7) 創(chuàng)建Map任務栏尚、Reduce任務:一個split對應一個map起愈,有多少split就有多少map; Reduce任務的數(shù)量由JobConf的mapred.reduce.tasks屬性決定

(8) TaskTracker執(zhí)行一個簡單的循環(huán),定期發(fā)送心跳(heartbeat)給JobTracker

Input files

Input file是map/reduce任務的原始數(shù)據(jù)译仗,一般存儲在HDFS上抬虽。應用程序至少應該指明輸入/輸出的位置(路徑),并通過實現(xiàn)合適的接口或抽象類提供map和reduce函數(shù)纵菌。再加上其他作業(yè)的參數(shù)阐污,就構(gòu)成了作業(yè)配置(job configuration)。然后咱圆,Hadoop的 job client提交作業(yè)(jar包/可執(zhí)行程序等)和配置信息給JobTracker笛辟,后者負責分發(fā)這些軟件和配置信息給slave、調(diào)度任務并監(jiān)控它們的執(zhí)行序苏,同時提供狀態(tài)和診斷信息給job-client隘膘。

InputFormat

InputFormat為Map/Reduce作業(yè)輸入的細節(jié)規(guī)范。Map/Reduce框架根據(jù)作業(yè)的InputFormat來:

(1) 檢查作業(yè)輸入的正確性杠览,如格式等。

(2) 把輸入文件切分成多個邏輯InputSplit實例纵势, 一個InputSplit將會被分配給一個獨立的Map任務踱阿。

(3) 提供RecordReader實現(xiàn),這個RecordReader從邏輯InputSplit中獲得輸入記錄(”K-V對”)钦铁,這些記錄將由Map任務處理软舌。

InputFormat有如下幾種:

請點擊此處輸入圖片描述

TextInputFormat:

TextInputFormat是默認的INputFormat,輸入文件中的每一行就是一個記錄牛曹,Key是這一行的byte offset佛点,而value是這一行的內(nèi)容。如果一個作業(yè)的Inputformat是TextInputFormat黎比,并且框架檢測到輸入文件的后綴是.gz和.lzo超营,就會使用對應的CompressionCodec自動解壓縮這些文件。但是需要注意阅虫,上述帶后綴的壓縮文件不會被切分演闭,并且整個壓縮文件會分給一個mapper來處理。

KeyValueTextInputFormat

輸入文件中每一行就是一個記錄颓帝,第一個分隔符字符切分每行米碰。在分隔符字符之前的內(nèi)容為Key窝革,在之后的為Value。分隔符變量通過key.value.separator.in.input.line變量設置吕座,默認為(\t)字符虐译。

NLineInputFormat

與TextInputFormat一樣,但每個數(shù)據(jù)塊必須保證有且只有N行吴趴,mapred.line.input.format.linespermap屬性漆诽,默認為1。

SequenceFileInputFormat

一個用來讀取字符流數(shù)據(jù)的InputFormat史侣,為用戶自定義的拴泌。字符流數(shù)據(jù)是Hadoop自定義的壓縮的二進制數(shù)據(jù)格式。它用來優(yōu)化從一個MapReduce任務的輸出到另一個MapReduce任務的輸入之間的數(shù)據(jù)傳輸過程惊橱。

InputSplits

InputSplit是一個單獨的Map任務需要處理的數(shù)據(jù)塊蚪腐。一般的InputSplit是字節(jié)樣式輸入,然后由RecordReader處理并轉(zhuǎn)化成記錄樣式税朴。通常一個split就是一個block回季,這樣做的好處是使得Map任務可以在存儲有當前數(shù)據(jù)的節(jié)點上運行本地的任務,而不需要通過網(wǎng)絡進行跨節(jié)點的任務調(diào)度正林。

可以通過設置mapred.min.split.size泡一,mapred.max.split.size,block.size來控制拆分的大小。如果mapred.min.split.size大于block size觅廓,則會將兩個block合成到一個split鼻忠,這樣有部分block數(shù)據(jù)需要通過網(wǎng)絡讀取杈绸;如果mapred.max.split.size小于block size帖蔓,則會將一個block拆成多個split,增加了Map任務數(shù)瞳脓。

假設splitSize是默認的64M塑娇,現(xiàn)在輸入包含3個文件,這3個文件的大小分別為10M劫侧,64M埋酬,100M,那么這3個文件會被分割為:

1 2輸入文件大小? ? ? ? ? ? ? ? 10M? ? 64M? ? 100M 分割后的InputSplit大小? ? ? 10M? ? 64M? ? 64M烧栋,36M

在Map任務開始前写妥,會先獲取文件在HDFS上的路徑和block信息,然后根據(jù)splitSize對文件進行切分(splitSize = computeSplitSize(blockSize, minSize, maxSize) )劲弦,默認splitSize 就等于blockSize的默認值(64m)耳标。

Mapper

Map是一類將輸入記錄集轉(zhuǎn)換為中間格式記錄集的獨立任務,主要是讀取InputSplit的每一個Key,Value對并進行處理

請點擊此處輸入圖片描述

確定map任務數(shù)量

Map/Reduce框架為每一個InputSplit產(chǎn)生一個map任務邑跪,而每個InputSplit是由該作業(yè)的InputFormat產(chǎn)生的,默認一個InputSplit大小就等于blockSize的默認值次坡。因此呼猪,maps的數(shù)量通常取決于輸入大小,也即輸入文件的block數(shù)。 因此砸琅,假如輸入數(shù)據(jù)有10TB宋距,而block大小為64M,則需要164,000個map症脂。map正常的并行規(guī)模大致是每個節(jié)點(node)大約10到100個map谚赎,對于CPU 消耗較小的map任務可以設到300個左右。

因為啟動任務也需要時間诱篷,所以在一個較大的作業(yè)中壶唤,最好每個map任務的執(zhí)行時間不要少于1分鐘,這樣可以讓啟動任務的開銷占比盡可能的低棕所。對于那種有大量小文件輸入的的作業(yè)來說闸盔,一個map處理多個文件會更有效率。如果輸入的是打文件琳省,那么一種提高效率的方式是增加block的大杏场(比如512M),每個map還是處理一個完整的HDFS的block针贬。

當在map處理的block比較大的時候击费,確保有足夠的內(nèi)存作為排序緩沖區(qū)是非常重要的,這可以加速map端的排序過程桦他。假如大多數(shù)的map輸出都能在排序緩沖區(qū)中處理的話應用的性能會有極大的提升蔫巩。這需要運行map過程的JVM具有更大的堆。

網(wǎng)格模式:確保map的大小快压,使得所有的map輸出可以在排序緩沖區(qū)中通過一次排序來完成操作批幌。

合適的map數(shù)量有以下好處:

(1) 減少了調(diào)度的負擔;更少的map意味著任務調(diào)度更簡單嗓节,集群中可用的空閑槽更多。

(2) 有足夠的內(nèi)存將map輸出容納在排序緩存中皆警,這使map端更有效率拦宣;

(3) 減少了需要shuffle map輸出的尋址次數(shù),每個map產(chǎn)生的輸出可用于每一個reduce信姓,因此尋址數(shù)就是map個數(shù)乘以reduce個數(shù)鸵隧;

(4) 每個shuffle 的片段更大,這減少了建立連接的相對開銷意推,所謂相對開銷是指相對于在網(wǎng)絡中傳輸數(shù)據(jù)的過程豆瘫。

(5) 這使reduce端合并map輸出的過程更高效,因為合并的次數(shù)更少菊值,因為需要合并的文件段更少了外驱。

執(zhí)行map任務

Mapper的實現(xiàn)者需要重寫 JobConfigurable.configure(JobConf)方法育灸,這個方法需要傳遞一個JobConf參數(shù),目的是完成Mapper的初始化工作昵宇。然后磅崭,框架為這個任務的InputSplit中每個鍵值對調(diào)用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。這里需要指出很多人的一種錯誤認識——“輸入和輸出的鍵值對類型一致瓦哎,一一對應”砸喻,這種認識是錯誤的。輸入輸出鍵值對的關系如下:

1 21) 輸出鍵值對不需要與輸入鍵值對的類型一致蒋譬。 2) 一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對割岛。

以 word count為例,輸入不需要是“一行個單詞”的形式犯助,可以是一行許多個單詞癣漆,輸入一行可以對應多行輸出,如下圖所示:

請點擊此處輸入圖片描述

通過調(diào)用OutputCollector.collect(WritableComparable,Writable)可以收集map(WritableComparable, Writable, OutputCollector, Reporter)輸出的鍵值對也切。應用程序可以使用Reporter報告進度扑媚,設定應用級別的狀態(tài)消息,更新Counters(計數(shù)器)雷恃,或者僅是表明自己運行正常疆股。

Map/Reduce框架隨后會把與一個特定key關聯(lián)的所有中間過程的值(value)分組并排序這個分組和排序過程被稱為Shuffle,然后把它們傳給Reducer以產(chǎn)出最終的結(jié)果。分組的總數(shù)目和一個作業(yè)的reduce任務的數(shù)目是一樣的倒槐。用戶可以通過實現(xiàn)自定義的 Partitioner來控制哪個key被分配給哪個 Reducer旬痹。 對于map的輸出,用戶可選擇通過JobConf.setCombinerClass(Class)指定一個combiner讨越,它負責對中間過程的輸出進行本地的聚集两残,這會有助于降低從Mapper到 Reducer數(shù)據(jù)傳輸量。

請點擊此處輸入圖片描述

這些被排好序的中間過程的輸出結(jié)果保存的格式是(key-len, key, value-len, value)把跨,應用程序可以通過JobConf控制對這些中間結(jié)果是否進行壓縮以及怎么壓縮人弓,使用哪種CompressionCodec。

整個map的執(zhí)行過程如下圖所示:

請點擊此處輸入圖片描述

map輸出溢寫(spill) && Shuffle

Shuffle

一般把從map任務輸出到reducer任務輸入之間的map/reduce框架所做的工作叫做shuffle着逐。這部分也是map/reduce框架最重要的部分崔赌。下面將詳細介紹這個shuffle中的各個步驟。

請點擊此處輸入圖片描述

內(nèi)存緩沖區(qū)

Map/Reduce框架為InputSplit中的每個鍵值對調(diào)用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作耸别,調(diào)用一次map()操作后就會得到一個新的(key,value)對健芭。當Map程序開始產(chǎn)生結(jié)果的時候,并不是直接寫到文件的秀姐,而是寫到一個內(nèi)存緩沖區(qū)(環(huán)形內(nèi)存緩沖區(qū))慈迈。每個map任務都有一個內(nèi)存緩沖區(qū),存儲著map的輸出結(jié)果省有,這個內(nèi)存緩沖區(qū)是有大小限制的痒留,默認是100MB(可以通過屬性io.sort.mb配置)谴麦。

當map task的輸出結(jié)果很多時,就可能會超過100MB內(nèi)存的限制狭瞎,所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時寫入磁盤细移,然后重新利用這塊緩沖區(qū)。這個從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為“spill”熊锭,中文可譯為溢寫弧轧。這個溢寫是由單獨線程來完成,不影響往緩沖區(qū)寫map結(jié)果的線程碗殷。

請點擊此處輸入圖片描述

溢寫線程啟動時不應該阻止map的結(jié)果輸出精绎,所以整個緩沖區(qū)有個溢寫的比例spill.percent(可以通過屬性Io.sort.spill.percent配置),這個比例默認是0.8锌妻,也就是當緩沖區(qū)的數(shù)據(jù)已經(jīng)達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB)代乃,溢寫線程啟動,鎖定這80MB的內(nèi)存仿粹,執(zhí)行溢寫過程搁吓。Map任務的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫,互不影響吭历,但如果緩沖區(qū)滿了堕仔,Map任務則會被阻塞。那么為什么需要設置寫入比例呢晌区?達到一定比例后摩骨,由于寫緩存和讀緩存是可以同時并行執(zhí)行的,這會降低把緩存數(shù)據(jù)騰空的時間朗若,從而提高效率恼五。

分區(qū)

在把map()輸出數(shù)據(jù)寫入內(nèi)存緩沖區(qū)之前會先進行Partitioner操作。Partitioner用于劃分鍵值空間(key space)哭懈。MapReduce提供Partitioner接口灾馒,它的作用就是根據(jù)key或value及reduce的數(shù)量來決定當前的這對輸出數(shù)據(jù)最終應該交由哪個reduce task處理。默認對key hash后再以reduce task數(shù)量取模遣总。默認的取模方式只是為了平均reduce的處理能力你虹,如果用戶自己對Partitioner有需求,可以訂制并設置到job上彤避。

1reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

HashPartitioner是默認的 Partitioner。

Partitioner操作得到的分區(qū)元數(shù)據(jù)也會被存儲到內(nèi)存緩沖區(qū)中夯辖。當數(shù)據(jù)達到溢出的條件時琉预,讀取緩存中的數(shù)據(jù)和分區(qū)元數(shù)據(jù),然后把屬與同一分區(qū)的數(shù)據(jù)合并到一起蒿褂。對于每一個分區(qū)圆米,都會在內(nèi)存中根據(jù)map輸出的key進行排序(排序是MapReduce模型默認的行為卒暂,這里的排序也是對序列化的字節(jié)做的排序),如果配置了Combiner娄帖,則排序后執(zhí)行Combiner(Combine之后可以減少寫入文件和傳輸?shù)臄?shù)據(jù))也祠。如果配置了壓縮,則最終寫入的文件會先進行壓縮近速,這樣可以減少寫入和傳輸?shù)臄?shù)據(jù)诈嘿。最后實現(xiàn)溢出的文件內(nèi)是分區(qū)的,且分區(qū)內(nèi)是有序的削葱。

每次溢出的數(shù)據(jù)寫入文件時奖亚,都按照分區(qū)的數(shù)值從小到大排序,內(nèi)部存儲是以tag的方式區(qū)分不同分區(qū)的數(shù)據(jù)析砸;同時生成一個索引文件昔字,這個索引文件記錄分區(qū)的描述信息,包括:起始位置首繁、長度作郭、以及壓縮長度,這些信息存儲在IndexRecord結(jié)構(gòu)里面弦疮。一個spill文件中的多個段的索引數(shù)據(jù)被組織成SpillRecord結(jié)構(gòu)夹攒,SpillRecord又被加入進indexCacheList中。

請點擊此處輸入圖片描述

Combiner

Combiner最主要的好處在于減少了shuffle過程從map端到reduce端的傳輸數(shù)據(jù)量挂捅。

請點擊此處輸入圖片描述

combiner階段是程序員可以選擇的芹助,combiner其實也是一種reduce操作。Combiner是一個本地化的reduce操作闲先,它是map運算的后續(xù)操作状土,主要是在map計算出中間文件前做一個簡單的合并重復key值的操作,例如我們對文件里的單詞頻率做統(tǒng)計伺糠,map計算時候如果碰到一個hadoop的單詞就會記錄為1蒙谓,但是這篇文章里hadoop可能會出現(xiàn)n多次,那么map輸出文件冗余就會很多训桶,因此在reduce計算前對相同的key做一個合并操作累驮,那么文件會變小,這樣就提高了寬帶的傳輸效率舵揭,畢竟hadoop計算力寬帶資源往往是計算的瓶頸也是最為寶貴的資源谤专,但是combiner操作是有風險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入午绳,例如:如果計算只是求總數(shù)置侍,最大值,最小值可以使用combiner,但是做平均值計算使用combiner的話蜡坊,最終的reduce計算結(jié)果就會出錯杠输。

Combiner 也有一個性能損失點,因為它需要一次額外的對于map輸出的序列化/反序列化過程秕衙。不能通過聚合將map端的輸出減少到20-30%的話就不適用combiner蠢甲。

壓縮

Map/Reduce框架為應用程序的寫入文件操作提供壓縮工具,這些工具可以為map輸出的中間數(shù)據(jù)和作業(yè)最終輸出數(shù)據(jù)(例如reduce的輸出)提供支持据忘。

壓縮中間數(shù)據(jù): 對map輸出的中間數(shù)據(jù)進行合適的壓縮可以減少map到reduce之間的網(wǎng)絡數(shù)據(jù)傳輸量鹦牛,從而提高性能。Lzo壓縮格式是一個壓縮map中間數(shù)據(jù)的合理選擇若河,它有效利用了CPU能岩。

壓縮應用輸出: 使用合適的壓縮格式壓縮輸出數(shù)據(jù)能夠減少應用的運行時間。Zlib/Gzip 格式在大多數(shù)情況下都是比較適當?shù)倪x擇萧福,因為它在較高壓縮率的情況下壓縮速度也還算可以拉鹃,bzip2 就慢得多了。

合并臨時文件

每次spill操作也就是寫入磁盤操作時候就會寫一個溢出文件鲫忍,也就是說在做map輸出有幾次spill就會產(chǎn)生多少個溢出文件膏燕,等map輸出全部做完后,map會合并這些輸出文件生成最終的正式輸出文件悟民,然后等待reduce任務來拉數(shù)據(jù)坝辫。將這些溢寫文件歸并到一起的過程叫做Merge。

請點擊此處輸入圖片描述

如果生成的文件太多射亏,可能會執(zhí)行多次合并近忙,每次最多能合并的文件數(shù)默認為10,可以通過屬性min.num.spills.for.combine配置智润。 多個溢出文件合并是及舍,同一個分區(qū)內(nèi)部也必須再做一次排序,排序算法是多路歸并排序窟绷。是否還需要做combine操作锯玛,一是看是否設置了combine,二是看溢出的文件數(shù)是否大于等于3兼蜈。最終生成的文件格式與單個溢出文件一致攘残,也是按分區(qū)順序存儲,并且有一個對應的索引文件为狸,記錄每個分區(qū)數(shù)據(jù)的起始位置歼郭,長度以及壓縮長度。這個索引文件名叫做file.out.index辐棒。

至此病曾,map端的所有工作都已結(jié)束姊途,最終生成的這個文件也存放在TaskTracker夠得著的某個本地目錄內(nèi)。每個reduce task不斷地通過RPC從JobTracker那里獲取map task是否完成的信息知态,如果reduce task得到通知,Reduce就可以開始復制結(jié)果數(shù)據(jù)立叛。

Reduce

簡單地說负敏,reduce任務在執(zhí)行之前的工作就是不斷地拉取每個map任務的最終結(jié)果,然后對從不同地方拉取過來的數(shù)據(jù)不斷地做merge秘蛇,也最終形成一個文件作為reduce任務的輸入文件其做。

請點擊此處輸入圖片描述

reduce的運行可以分成copy、merge赁还、reduce三個階段妖泄,下面將具體說明這3個階段的詳細執(zhí)行流程。

copy

由于job的每一個map都會根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個partition艘策,所以map的中間結(jié)果中是有可能包含每一個reduce需要處理的部分數(shù)據(jù)的蹈胡。所以,為了優(yōu)化reduce的執(zhí)行時間朋蔫,hadoop中是等job的第一個map結(jié)束后罚渐,所有的reduce就開始嘗試從完成的map中下載該reduce對應的partition部分數(shù)據(jù),因此map和reduce是交叉進行的驯妄,如下圖所示:

請點擊此處輸入圖片描述

reduce進程啟動數(shù)據(jù)copy線程(Fetcher)荷并,通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。由于map通常有許多個青扔,所以對一個reduce來說源织,下載也可以是并行的從多個map下載,這個并行度是可以通過mapred.reduce.parallel.copies(default 5)調(diào)整微猖。默認情況下谈息,每個只會有5個并行的下載線程在從map下數(shù)據(jù),如果一個時間段內(nèi)job完成的map有100個或者更多励两,那么reduce也最多只能同時下載5個map的數(shù)據(jù)黎茎,所以這個參數(shù)比較適合map很多并且完成的比較快的job的情況下調(diào)大,有利于reduce更快的獲取屬于自己部分的數(shù)據(jù)当悔。

reduce的每一個下載線程在下載某個map數(shù)據(jù)的時候傅瞻,有可能因為那個map中間結(jié)果所在機器發(fā)生錯誤,或者中間結(jié)果的文件丟失盲憎,或者網(wǎng)絡瞬斷等等情況嗅骄,這樣reduce的下載就有可能失敗,所以reduce的下載線程并不會無休止的等待下去饼疙,當一定時間后下載仍然失敗溺森,那么下載線程就會放棄這次下載慕爬,并在隨后嘗試從另外的地方下載(因為這段時間map可能重跑)。reduce下載線程的這個最大的下載時間段是可以通過mapred.reduce.copy.backoff(default 300秒)調(diào)整的屏积。如果集群環(huán)境的網(wǎng)絡本身是瓶頸医窿,那么用戶可以通過調(diào)大這個參數(shù)來避免reduce下載線程被誤判為失敗的情況挖垛。不過在網(wǎng)絡環(huán)境比較好的情況下臀规,沒有必要調(diào)整。通常來說專業(yè)的集群網(wǎng)絡不應該有太大問題扶平,所以這個參數(shù)需要調(diào)整的情況不多渣聚。

merge

這里的merge如map端的merge動作類似独榴,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中奕枝,然后當使用內(nèi)存達到一定量的時候才刷入磁盤棺榔。這里需要強調(diào)的是,merge有三種形式:1)內(nèi)存到內(nèi)存 2)內(nèi)存到磁盤 3)磁盤到磁盤隘道。內(nèi)存到內(nèi)存的merge一般不適用症歇,主要是內(nèi)存到磁盤和磁盤到磁盤的merge。

這里的緩沖區(qū)大小要比map端的更為靈活薄声,它基于JVM的heap size設置当船。這個內(nèi)存大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數(shù)mapred.job.shuffle.input.buffer.percent(default 0.7)來設置默辨, 這個參數(shù)其實是一個百分比德频,意思是說,shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task缩幸。

也就是說壹置,如果該reduce task的最大heap使用量(通常通過mapred.child.java.opts來設置,比如設置為-Xmx1024m)的一定比例用來緩存數(shù)據(jù)表谊。默認情況下钞护,reduce會使用其heapsize的70%來在內(nèi)存中緩存數(shù)據(jù)。假設mapred.job.shuffle.input.buffer.percent為0.7爆办,reduce task的max heapsize為1G难咕,那么用來做下載數(shù)據(jù)緩存的內(nèi)存就為大概700MB左右。這700M的內(nèi)存距辆,跟map端一樣余佃,也不是要等到全部寫滿才會往磁盤刷的,而是當這700M中被使用到了一定的限度(通常是一個百分比)跨算,就會開始往磁盤刷(刷磁盤前會先做sort)爆土。這個限度閾值也是可以通過參數(shù)mapred.job.shuffle.merge.percent(default 0.66)來設定。與map 端類似诸蚕,這也是溢寫的過程步势,這個過程中如果你設置有Combiner氧猬,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件坏瘩。這種merge方式一直在運行盅抚,直到?jīng)]有map端的數(shù)據(jù)時才結(jié)束,然后啟動磁盤到磁盤的merge方式生成最終的那個文件倔矾。

reducer

當reduce將所有的map上對應自己partition的數(shù)據(jù)下載完成后泉哈,就會開始真正的reduce計算階段。當reduce task真正進入reduce函數(shù)的計算階段的時候破讨,有一個參數(shù)也是可以調(diào)整reduce的計算行為。也就是mapred.job.reduce.input.buffer.percent(default 0.0)奕纫。由于reduce計算時肯定也是需要消耗內(nèi)存的提陶,而在讀取reduce需要的數(shù)據(jù)時,同樣是需要內(nèi)存作為buffer匹层,這個參數(shù)是控制隙笆,需要多少的內(nèi)存百分比來作為reduce讀已經(jīng)sort好的數(shù)據(jù)的buffer百分比。默認情況下為0升筏,也就是說撑柔,默認情況下,reduce是全部從磁盤開始讀處理數(shù)據(jù)您访。如果這個參數(shù)大于0铅忿,那么就會有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給reduce,當reduce計算邏輯消耗內(nèi)存很小時灵汪,可以分一部分內(nèi)存用來緩存數(shù)據(jù)檀训,反正reduce的內(nèi)存閑著也是閑著。

Reduce在這個階段享言,框架為已分組的輸入數(shù)據(jù)中的每個 對調(diào)用一次reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法峻凫。 Reduce任務的輸出通常是通過調(diào)用 OutputCollector.collect(WritableComparable, Writable)寫入 文件系統(tǒng)的。Reducer的輸出是沒有排序的览露。

那么一般需要多少個Reduce呢荧琼?

Reduce的數(shù)目建議是0.95或1.75乘以 ( * mapred.tasktracker.reduce.tasks.maximum)。 用0.95差牛,所有reduce可以在maps一完成時就立刻啟動命锄,開始傳輸map的輸出結(jié)果。用1.75多糠,速度快的節(jié)點可以在完成第一輪reduce任務后累舷,可以開始第二輪,這樣可以得到比較好的負載均衡的效果夹孔。

reduces的性能很大程度上受shuffle的性能所影響被盈。應用配置的reduces數(shù)量是一個決定性的因素析孽。太多或者太少的reduce都不利于發(fā)揮最佳性能:?太少的reduce會使得reduce運行的節(jié)點處于過度負載狀態(tài),在極端情況下我們見過一個reduce要處理100g的數(shù)據(jù)只怎。這對于失敗恢復有著非常致命的負面影響袜瞬,因為失敗的reduce對作業(yè)的影響非常大。太多的reduce對shuffle過程有不利影響身堡。在極端情況下會導致作業(yè)的輸出都是些小文件邓尤,這對NameNode不利,并且會影響接下來要處理這些小文件的mapreduce應用的性能贴谎。在大多數(shù)情況下汞扎,應用應該保證每個reduce處理1-2g數(shù)據(jù),最多5-10g擅这。

The output files

作業(yè)的輸出OutputFormat 描述Map/Reduce作業(yè)的輸出樣式澈魄。Map/Reduce框架根據(jù)作業(yè)的OutputFormat來:

1 21. 檢驗作業(yè)的輸出,例如檢查輸出路徑是否已經(jīng)存在仲翎。 2. 提供一個RecordWriter的實現(xiàn)痹扇,用來輸出作業(yè)結(jié)果。 輸出文件保存在FileSystem上溯香。

OutputFormat主要有以下幾種:

請點擊此處輸入圖片描述

TextOutputFormat是默認的 OutputFormat鲫构。

計數(shù)器(Counters)

計數(shù)器(Counters) 展現(xiàn)一些全局性的統(tǒng)計度量,這些度量由map/reduce框架本身玫坛,也可由應用來設定结笨。應用可以自行定義任意的計數(shù)器并且在map或者reduce方法中更新它們的值∈疲框架會對計數(shù)器的值做全局聚合禀梳。 計數(shù)器適合于追蹤記錄一些量不是很大,但是很重要的全局性信息肠骆。不應該用于一些粒度過細的信息統(tǒng)計算途。 使用計數(shù)器的代價非常昂貴,因為在應用的生命周期內(nèi)JobTracker需要給每一個map/reduce任務維護一組計數(shù)器(定義了多少個就維護多少個)蚀腿。

Reporter是用于map/reduce應用程序報告進度嘴瓤,設定應用級別的狀態(tài)消息, 更新Counters(計數(shù)器)的機制莉钙。

Ref

Apache Hadoop: Best Practices and Anti-Patterns

Understanding Hadoop Clusters and the Network

Introduction to MapReduce

http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html

MapReduce:詳解Shuffle過程

http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html

Anatomy of a MapReduce Job

http://pennywong.gitbooks.io/hadoop-notebook/content/mapreduce/introduction.html

https://developer.yahoo.com/hadoop/tutorial/module4.html

https://developer.yahoo.com/hadoop/tutorial/module5.html

長按掃一掃廓脆,關注我們。每天都有精彩干貨哦磁玉!~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末停忿,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蚊伞,更是在濱河造成了極大的恐慌席赂,老刑警劉巖吮铭,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異颅停,居然都是意外死亡谓晌,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門癞揉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來纸肉,“玉大人,你說我怎么就攤上這事喊熟“胤荆” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵芥牌,是天一觀的道長预吆。 經(jīng)常有香客問我,道長胳泉,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任岩遗,我火速辦了婚禮扇商,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘宿礁。我一直安慰自己案铺,他們只是感情好,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布梆靖。 她就那樣靜靜地躺著控汉,像睡著了一般。 火紅的嫁衣襯著肌膚如雪返吻。 梳的紋絲不亂的頭發(fā)上姑子,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天,我揣著相機與錄音测僵,去河邊找鬼街佑。 笑死,一個胖子當著我的面吹牛捍靠,可吹牛的內(nèi)容都是我干的沐旨。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼榨婆,長吁一口氣:“原來是場噩夢啊……” “哼磁携!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起良风,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤谊迄,失蹤者是張志新(化名)和其女友劉穎闷供,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鳞上,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡这吻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了篙议。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唾糯。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖鬼贱,靈堂內(nèi)的尸體忽然破棺而出移怯,到底是詐尸還是另有隱情,我是刑警寧澤这难,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布舟误,位于F島的核電站,受9級特大地震影響姻乓,放射性物質(zhì)發(fā)生泄漏嵌溢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一蹋岩、第九天 我趴在偏房一處隱蔽的房頂上張望赖草。 院中可真熱鬧,春花似錦剪个、人聲如沸秧骑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽乎折。三九已至,卻和暖如春侵歇,著一層夾襖步出監(jiān)牢的瞬間骂澄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工惕虑, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留酗洒,地道東北人。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓枷遂,卻偏偏與公主長得像樱衷,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子酒唉,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 目的這篇教程從用戶的角度出發(fā)矩桂,全面地介紹了Hadoop Map/Reduce框架的各個方面。先決條件請先確認Had...
    SeanC52111閱讀 1,708評論 0 1
  • 參考:hadoop 學習筆記:mapreduce框架詳解 [toc] 總結(jié) Mapreduce是一個計算框架,既然...
    小小少年Boy閱讀 1,165評論 0 4
  • MapReduce過程詳解及其性能優(yōu)化 [toc] 轉(zhuǎn)載:MapReduce過程詳解及其性能優(yōu)化 總結(jié) 詳情 從J...
    小小少年Boy閱讀 7,035評論 2 18
  • 思考問題 MapReduce總結(jié) MapReduce MapReduce的定義MapReduce是一種編程模型侄榴, ...
    Sakura_P閱讀 934評論 0 1
  • MapReduce框架結(jié)構(gòu)## MapReduce是一個用于大規(guī)模數(shù)據(jù)處理的分布式計算模型MapReduce模型主...
    Bloo_m閱讀 3,724評論 0 4