MapReduce shuffle

shuffle過程
shuffle概念

shuffle的本意是洗牌堰汉、混洗的意思,把一組有規(guī)則的數(shù)據(jù)盡量打亂成無規(guī)則的數(shù)據(jù)。而在MapReduce中审孽,shuffle更像是洗牌的逆過程,指的是將map端的無規(guī)則輸出按指定的規(guī)則“打亂”成具有一定規(guī)則的數(shù)據(jù)浑娜,以便reduce端接收處理佑力。其在MapReduce中所處的工作階段是map輸出后到reduce接收前,具體可以分為map端和reduce端前后兩個部分筋遭。在shuffle之前搓萧,也就是在map階段,MapReduce會對要處理的數(shù)據(jù)進行分片(split)操作宛畦,為每一個分片分配一個MapTask任務瘸洛。接下來map()函數(shù)會對每一個分片中的每一行數(shù)據(jù)進行處理得到鍵值對(key,value),其中key為偏移量次和,value為一行的內(nèi)容反肋。此時得到的鍵值對又叫做“中間結(jié)果”。此后便進入shuffle階段踏施,由此可以看出shuffle階段的作用是處理“中間結(jié)果”石蔗。

block塊(物理劃分)

block是HDFS中的基本存儲單位罕邀,hadoop1.x默認大小為64M而hadoop2.x默認塊大小為128M。文件上傳到HDFS养距,就要劃分數(shù)據(jù)成塊诉探,這里的劃分屬于物理的劃分(實現(xiàn)機制也就是設置一個read方法,每次限制最多讀128M的數(shù)據(jù)后調(diào)用write進行寫入到hdfs)棍厌,塊的大小可通過 dfs.block.size配置肾胯。block采用冗余機制保證數(shù)據(jù)的安全:默認為3份,可通過dfs.replication配置耘纱。注意:當更改塊大小的配置后敬肚,新上傳的文件的塊大小為新配置的值,以前上傳的文件的塊大小為以前的配置值

split分片(邏輯劃分)

Hadoop中split劃分屬于邏輯上的劃分束析,目的只是為了讓map task更好地獲取數(shù)據(jù)艳馒。split是通過hadoop中的InputFormat接口中的getSplit()方法得到的。那么员寇,split的大小具體怎么得到呢弄慰?

首先介紹幾個數(shù)據(jù)量:

  • totalSize:整個mapreduce job輸入文件的總大小。

  • numSplits:來自job.getNumMapTasks()蝶锋,即在job啟動時用戶利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值陆爽,從方法的名稱上看,是用于設置map的個數(shù)牲览。但是墓陈,最終map的個數(shù)也就是split的個數(shù)并不一定取用戶設置的這個值,用戶設置的map個數(shù)值只是給最終的map個數(shù)一個提示第献,只是一個影響因素贡必,而不是決定因素。

  • goalSize:totalSize/numSplits庸毫,即期望的split的大小仔拟,也就是每個mapper處理多少的數(shù)據(jù)。但也僅僅是期望飒赃。

  • minSize:split的最小值利花,該值可由兩個途徑設置:

1.通過子類重寫方法protected void setMinSplitSize(long minSplitSize)進行設置。一般情況為1载佳,特殊情況除外

2.通過配置文件中的mapred.min.split.size進行設置

3.最終取兩者中的最大值炒事!
split計算公式:finalSplitSize=max(minSize,min(goalSize,blockSize))

shuffle流程概括

因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結(jié)果”不會立馬寫入磁盤蔫慧,而是優(yōu)先存儲到map節(jié)點的“環(huán)形內(nèi)存緩沖區(qū)”挠乳,在寫入的過程中進行分區(qū)(partition),也就是對于每個鍵值對來說,都增加了一個partition屬性值睡扬,然后連同鍵值對一起序列化成字節(jié)數(shù)組寫入到緩沖區(qū)(緩沖區(qū)采用的就是字節(jié)數(shù)組盟蚣,默認大小為100M)。當寫入的數(shù)據(jù)量達到預先設置的闕值后(mapreduce.map.io.sort.spill.percent,默認0.80卖怜,或者80%)便會啟動溢寫出線程緩沖區(qū)中的那部分數(shù)據(jù)溢出寫(spill)到磁盤的臨時文件中屎开,并在寫入前根據(jù)key進行排序(sort)和合并(combine,可選操作)马靠。溢出寫過程按輪詢方式將緩沖區(qū)中的內(nèi)容寫到mapreduce.cluster.local.dir屬性指定的目錄中奄抽。當整個map任務完成溢出寫后,會對磁盤中這個map任務產(chǎn)生的所有臨時文件(spill文件)進行歸并(merge)操作生成最終的正式輸出文件虑粥,此時的歸并是將所有spill文件中的相同partition合并到一起如孝,并對各個partition中的數(shù)據(jù)再進行一次排序(sort)宪哩,生成key和對應的value-list娩贷,文件歸并時,如果溢寫文件數(shù)量超過參數(shù)min.num.spills.for.combine的值(默認為3)時锁孟,可以再次進行合并彬祖。至此,map端shuffle過程結(jié)束品抽,接下來等待reduce task來拉取數(shù)據(jù)储笑。對于reduce端的shuffle過程來說,reduce task在執(zhí)行之前的工作就是不斷地拉取當前job里每個map task的最終結(jié)果圆恤,然后對從不同地方拉取過來的數(shù)據(jù)不斷地做merge最后合并成一個分區(qū)相同的大文件突倍,然后對這個文件中的鍵值對按照key進行sort排序,排好序之后緊接著進行分組盆昙,分組完成后才將整個文件交給reduce task處理

image.png

shuffle詳細流程

Map端shuffle

①分區(qū)partition

②寫入環(huán)形內(nèi)存緩沖區(qū)

③執(zhí)行溢出寫

  • 排序sort(根據(jù)key)--->合并combiner--->生成溢出寫文件

④歸并merge(也涉及到key的sort及combiner)

① 分區(qū)Partition

在將map()函數(shù)處理后得到的(key,value)對寫入到緩沖區(qū)之前羽历,需要先進行分區(qū)操作,這樣就能把map任務處理的結(jié)果發(fā)送給指定的reducer去執(zhí)行淡喜,從而達到負載均衡秕磷,避免數(shù)據(jù)傾斜。MapReduce提供默認的分區(qū)類(HashPartitioner)炼团,其核心代碼如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {
 
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
 
}
②寫入環(huán)形內(nèi)存緩沖區(qū)

因為頻繁的磁盤I/O操作會嚴重的降低效率澎嚣,因此“中間結(jié)果”不會立馬寫入磁盤,而是優(yōu)先存儲到map節(jié)點的“環(huán)形內(nèi)存緩沖區(qū)”瘟芝,并做一些預排序以提高效率易桃,當寫入的數(shù)據(jù)量達到預先設置的闕值后便會執(zhí)行一次I/O操作將數(shù)據(jù)寫入到磁盤。每個map任務都會分配一個環(huán)形內(nèi)存緩沖區(qū)锌俱,用于存儲map任務輸出的鍵值對(默認大小100MB晤郑,mapreduce.task.io.sort.mb調(diào)整)以及對應的partition,被緩沖的(key,value)對已經(jīng)被序列化(為了寫入磁盤)

  • 個人總結(jié):寫入緩沖區(qū)前,先partion->根據(jù)partion及key排序預排序后->寫入磁盤中
③執(zhí)行溢寫出

一旦緩沖區(qū)內(nèi)容達到閾值(mapreduce.map.io.sort.spill.percent,默認0.80,或者80%)贩汉,就會會鎖定這80%的內(nèi)存驱富,并在每個分區(qū)中對其中的鍵值對按鍵進行sort排序,具體是將數(shù)據(jù)按照partition和key兩個關鍵字進行排序匹舞,排序結(jié)果為緩沖區(qū)內(nèi)的數(shù)據(jù)按照partition為單位聚集在一起褐鸥,同一個partition內(nèi)的數(shù)據(jù)按照key有序。排序完成后會創(chuàng)建一個溢出寫文件(臨時文件)赐稽,然后開啟一個后臺線程把這部分數(shù)據(jù)以一個臨時文件的方式溢出寫(spill)到本地磁盤中(如果客戶端自定義了Combiner(相當于map階段的reduce)叫榕,則會在分區(qū)排序后到溢寫出前自動調(diào)用combiner,將相同的key的value相加姊舵,這樣的好處就是減少溢寫到磁盤的數(shù)據(jù)量晰绎。這個過程叫“合并”)。剩余的20%的內(nèi)存在此期間可以繼續(xù)寫入map輸出的鍵值對括丁。溢出寫過程按輪詢方式將緩沖區(qū)中的內(nèi)容寫到mapreduce.cluster.local.dir屬性指定的目錄中

合并Combiner
如果指定了Combiner荞下,可能在兩個地方被調(diào)用:
1.當為作業(yè)設置Combiner類后,緩存溢出線程將緩存存放到磁盤時史飞,就會調(diào)用尖昏;
2.緩存溢出的數(shù)量超過mapreduce.map.combine.minspills(默認3)時,在緩存溢出文件合并的時候會調(diào)用

  • 合并(Combine)和歸并(Merge)的區(qū)別:
    兩個鍵值對<“a”,1>和<“a”,1>构资,如果合并抽诉,會得到<“a”,2>,如果歸并吐绵,會得到<“a”,<1,1>>

特殊情況:當數(shù)據(jù)量很小迹淌,達不到緩沖區(qū)闕值時,怎么處理己单?

對于這種情況唉窃,目前看到有兩種不一樣的說法:

   ①不會有寫臨時文件到磁盤的操作,也不會有后面的合并荷鼠。

   ②最終也會以臨時文件的形式存儲到本地磁盤

至于真實情況是怎么樣的句携,該文章的大佬說他也不清楚。允乐。矮嫉。

④歸并merge

當一個map task處理的數(shù)據(jù)很大,以至于超過緩沖區(qū)內(nèi)存時牍疏,就會生成多個spill文件蠢笋。此時就需要對同一個map任務產(chǎn)生的多個spill文件進行歸并生成最終的一個已分區(qū)且已排序的大文件。配置屬性mapreduce.task.io.sort.factor控制著一次最多能合并多少流鳞陨,默認值是10昨寞。這個過程包括排序和合并(可選)瞻惋,歸并得到的文件內(nèi)鍵值對有可能擁有相同的key,這個過程如果client設置過Combiner援岩,也會合并相同的key值的鍵值對(根據(jù)上面提到的combine的調(diào)用時機可知)歼狼。

溢出寫文件歸并完畢后,Map將刪除所有的臨時溢出寫文件享怀,并告知NodeManager任務已完成羽峰,只要其中一個MapTask完成,ReduceTask就開始復制它的輸出(Copy階段分區(qū)輸出文件通過http的方式提供給reducer)

壓縮
寫磁盤時壓縮map端的輸出添瓷,因為這樣會讓寫磁盤的速度更快梅屉,節(jié)約磁盤空間,并減少傳給reducer的數(shù)據(jù)量鳞贷。默認情況下坯汤,輸出是不壓縮的(將mapreduce.map.output.compress設置為true即可啟動)

Reduce端shuffle

①復制copy

②歸并merge

③reduce

結(jié)合下面這張圖可以直觀感受reduce端的shuffle過程


image.png
①復制copy

Reduce進程啟動一些數(shù)據(jù)copy線程,通過HTTP方式請求MapTask所在的NodeManager以獲取輸出文件搀愧。
NodeManager需要為分區(qū)文件運行reduce任務惰聂。并且reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區(qū)文件。而每個map任務的完成時間可能不同妈橄,因此只要有一個任務完成庶近,reduce任務就開始復制其輸出

reduce任務有少量復制線程翁脆,因此能夠并行取得map輸出眷蚓。默認線程數(shù)為5,但這個默認值可以通過mapreduce.reduce.shuffle.parallelcopies屬性進行設置反番。

【Reducer如何知道自己應該處理哪些數(shù)據(jù)呢沙热?】
因為Map端進行partition的時候,實際上就相當于指定了每個Reducer要處理的數(shù)據(jù)(partition就對應了Reducer)罢缸,所以Reducer在拷貝數(shù)據(jù)的時候只需拷貝與自己對應的partition中的數(shù)據(jù)即可篙贸。每個Reducer會處理一個或者多個partition。

【reducer如何知道要從哪臺機器上去的map輸出呢枫疆?】
map任務完成后爵川,它們會使用心跳機制通知它們的application master、因此對于指定作業(yè)息楔,application master知道m(xù)ap輸出和主機位置之間的映射關系寝贡。reducer中的一個線程定期詢問master以便獲取map輸出主機的位置。知道獲得所有輸出位置

②歸并merge

Copy 過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中值依,這里的緩沖區(qū)大小要比 map 端的更為靈活圃泡,它基于 JVM 的 heap size 設置,因為 Shuffle 階段 Reducer 不運行愿险,所以應該把絕大部分的內(nèi)存都給 Shuffle 用颇蜡。

Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中,如果內(nèi)存緩沖區(qū)中能放得下這次數(shù)據(jù)的話就直接把數(shù)據(jù)寫到內(nèi)存中,即內(nèi)存到內(nèi)存merge风秤。Reduce要向每個Map去拖取數(shù)據(jù)鳖目,在內(nèi)存中每個Map對應一塊數(shù)據(jù),當內(nèi)存緩存區(qū)中存儲的Map數(shù)據(jù)占用空間達到一定程度的時候缤弦,開始啟動內(nèi)存中merge疑苔,把內(nèi)存中的數(shù)據(jù)merge輸出到磁盤上一個文件中,即內(nèi)存到磁盤merge甸鸟。與map端的溢寫類似惦费,在將buffer中多個map輸出合并寫入磁盤之前,如果設置了Combiner抢韭,則會化簡壓縮合并map輸出薪贫。Reduce的內(nèi)存緩沖區(qū)可通過mapred.job.shuffle.input.buffer.percent配置,默認是JVM的heap size的70%刻恭。內(nèi)存到磁盤merge的啟動門限可以通過mapred.job.shuffle.merge.percent配置瞧省,默認是66%

當屬于該reducer的map輸出全部拷貝完成,則會在reducer上生成多個文件(如果拖取的所有map數(shù)據(jù)總量都沒有內(nèi)存緩沖區(qū)鳍贾,則數(shù)據(jù)就只存在于內(nèi)存中)鞍匾,這時開始執(zhí)行合并操作,即磁盤到磁盤merge骑科,Map的輸出數(shù)據(jù)已經(jīng)是有序的橡淑,Merge進行一次合并排序,所謂Reduce端的sort過程就是這個合并的過程咆爽,采取的排序方法跟map階段不同梁棠,因為每個map端傳過來的數(shù)據(jù)是排好序的,因此眾多排好序的map輸出文件在reduce端進行合并時采用的是歸并排序(MAP端shaffer 是堆排序)斗埂,針對鍵進行歸并排序符糊。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的呛凶。最終Reduce shuffle過程會輸出一個整體有序的數(shù)據(jù)塊

③reduce

當一個reduce任務完成全部的復制和排序后男娄,就會針對已根據(jù)鍵排好序的Key構(gòu)造對應的Value迭代器。這時就要用到分組漾稀,默認的根據(jù)鍵分組模闲,自定義的可是使用 job.setGroupingComparatorClass()方法設置分組函數(shù)類。對于默認分組來說县好,只要這個比較器比較的兩個Key相同围橡,它們就屬于同一組,它們的 Value就會放在一個Value迭代器缕贡,而這個迭代器的Key使用屬于同一個組的所有Key的第一個Key翁授。

在reduce階段拣播,reduce()方法的輸入是所有的Key和它的Value迭代器。此階段的輸出直接寫到輸出文件系統(tǒng)收擦,一般為HDFS贮配。如果采用HDFS,由于NodeManager也運行數(shù)據(jù)節(jié)點塞赂,所以第一個塊副本將被寫到本地磁盤泪勒。

1、當reduce將所有的map上對應自己partition的數(shù)據(jù)下載完成后宴猾,reducetask真正進入reduce函數(shù)的計算階段圆存。由于reduce計算時同樣是需要內(nèi)存作為buffer,可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代碼MergeManagerImpl.java:674行)來設置reduce的緩存仇哆。

這個參數(shù)默認情況下為0沦辙,也就是說,reduce是全部從磁盤開始讀處理數(shù)據(jù)讹剔。如果這個參數(shù)大于0油讯,那么就會有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給reduce,當reduce計算邏輯消耗內(nèi)存很小時延欠,可以分一部分內(nèi)存用來緩存數(shù)據(jù)陌兑,可以提升計算的速度。所以默認情況下都是從磁盤讀取數(shù)據(jù)由捎,如果內(nèi)存足夠大的話兔综,務必設置該參數(shù)讓reduce直接從緩存讀數(shù)據(jù),這樣做就有點Spark Cache的感覺隅俘。

2邻奠、Reduce在這個階段,框架為已分組的輸入數(shù)據(jù)中的每個鍵值對對調(diào)用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法为居。Reduce任務的輸出通常是通過調(diào)用 OutputCollector.collect(WritableComparable,Writable)寫入文件系統(tǒng)的。

參考自
MapReduce shuffle過程詳解
MapReduce的shuffle過程詳解

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末杀狡,一起剝皮案震驚了整個濱河市蒙畴,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌呜象,老刑警劉巖膳凝,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異恭陡,居然都是意外死亡蹬音,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門休玩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來著淆,“玉大人劫狠,你說我怎么就攤上這事∮啦浚” “怎么了独泞?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長苔埋。 經(jīng)常有香客問我懦砂,道長,這世上最難降的妖魔是什么组橄? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任荞膘,我火速辦了婚禮,結(jié)果婚禮上玉工,老公的妹妹穿的比我還像新娘衫画。我一直安慰自己,他們只是感情好瓮栗,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布削罩。 她就那樣靜靜地躺著,像睡著了一般费奸。 火紅的嫁衣襯著肌膚如雪弥激。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天愿阐,我揣著相機與錄音微服,去河邊找鬼。 笑死缨历,一個胖子當著我的面吹牛以蕴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播辛孵,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼丛肮,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了魄缚?” 一聲冷哼從身側(cè)響起宝与,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎冶匹,沒想到半個月后习劫,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡嚼隘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年眷茁,在試婚紗的時候發(fā)現(xiàn)自己被綠了啼器。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡伴榔,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布幢炸,位于F島的核電站,受9級特大地震影響拒贱,放射性物質(zhì)發(fā)生泄漏宛徊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一逻澳、第九天 我趴在偏房一處隱蔽的房頂上張望闸天。 院中可真熱鬧,春花似錦斜做、人聲如沸苞氮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽笼吟。三九已至,卻和暖如春霸旗,著一層夾襖步出監(jiān)牢的瞬間贷帮,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工诱告, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留撵枢,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓精居,卻偏偏與公主長得像锄禽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子靴姿,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354