shuffle過程
shuffle概念
shuffle的本意是洗牌堰汉、混洗的意思,把一組有規(guī)則的數(shù)據(jù)盡量打亂成無規(guī)則的數(shù)據(jù)。而在MapReduce中审孽,shuffle更像是洗牌的逆過程,指的是將map端的無規(guī)則輸出按指定的規(guī)則“打亂”成具有一定規(guī)則的數(shù)據(jù)浑娜,以便reduce端接收處理佑力。其在MapReduce中所處的工作階段是map輸出后到reduce接收前,具體可以分為map端和reduce端前后兩個部分筋遭。在shuffle之前搓萧,也就是在map階段,MapReduce會對要處理的數(shù)據(jù)進行分片(split)操作宛畦,為每一個分片分配一個MapTask任務瘸洛。接下來map()函數(shù)會對每一個分片中的每一行數(shù)據(jù)進行處理得到鍵值對(key,value),其中key為偏移量次和,value為一行的內(nèi)容反肋。此時得到的鍵值對又叫做“中間結(jié)果”。此后便進入shuffle階段踏施,由此可以看出shuffle階段的作用是處理“中間結(jié)果”石蔗。
block塊(物理劃分)
block是HDFS中的基本存儲單位罕邀,hadoop1.x默認大小為64
M而hadoop2.x默認塊大小為128
M。文件上傳到HDFS养距,就要劃分數(shù)據(jù)成塊诉探,這里的劃分屬于物理的劃分(實現(xiàn)機制也就是設置一個read
方法,每次限制最多讀128M
的數(shù)據(jù)后調(diào)用write
進行寫入到hdfs
)棍厌,塊的大小可通過 dfs.block.size
配置肾胯。block采用冗余機制保證數(shù)據(jù)的安全:默認為3份,可通過dfs.replication配置耘纱。注意:當更改塊大小的配置后敬肚,新上傳的文件的塊大小為新配置的值,以前上傳的文件的塊大小為以前的配置值
split分片(邏輯劃分)
Hadoop中split
劃分屬于邏輯
上的劃分束析,目的只是為了讓map task
更好地獲取數(shù)據(jù)艳馒。split是通過hadoop中的InputFormat
接口中的getSplit()
方法得到的。那么员寇,split的大小具體怎么得到呢弄慰?
首先介紹幾個數(shù)據(jù)量:
totalSize:整個mapreduce job輸入文件的總大小。
numSplits:來自job.getNumMapTasks()蝶锋,即在job啟動時用戶利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值陆爽,從方法的名稱上看,是用于設置map的個數(shù)牲览。但是墓陈,最終map的個數(shù)也就是split的個數(shù)并不一定取用戶設置的這個值,用戶設置的map個數(shù)值只是給最終的map個數(shù)一個提示第献,只是一個影響因素贡必,而不是決定因素。
goalSize:totalSize/numSplits庸毫,即期望的split的大小仔拟,也就是每個mapper處理多少的數(shù)據(jù)。但也僅僅是期望飒赃。
minSize:split的最小值利花,該值可由兩個途徑設置:
1.通過子類重寫方法protected void setMinSplitSize(long minSplitSize)進行設置。一般情況為1载佳,特殊情況除外
2.通過配置文件中的mapred.min.split.size進行設置
3.最終取兩者中的最大值炒事!
split計算公式:finalSplitSize=max(minSize,min(goalSize,blockSize))
shuffle流程概括
因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結(jié)果”不會立馬寫入磁盤蔫慧,而是優(yōu)先存儲到map節(jié)點的“環(huán)形內(nèi)存緩沖區(qū)”
挠乳,在寫入的過程中進行分區(qū)
(partition),也就是對于每個鍵值對來說,都增加了一個partition屬性值
睡扬,然后連同鍵值對一起序列化成字節(jié)數(shù)組寫入到緩沖區(qū)
(緩沖區(qū)采用的就是字節(jié)數(shù)組盟蚣,默認大小為100M
)。當寫入的數(shù)據(jù)量達到預先設置的闕值后(mapreduce.map.io.sort.spill.percent
,默認0.80卖怜,或者80%)便會啟動溢寫出線程
將緩沖區(qū)
中的那部分數(shù)據(jù)溢出寫(spill)到磁盤
的臨時文件中屎开,并在寫入前根據(jù)key進行排序
(sort)和合并
(combine,可選操作)马靠。溢出寫過程按輪詢方式將緩沖區(qū)中的內(nèi)容寫到mapreduce.cluster.local.dir屬性指定的目錄中奄抽。當整個map任務完成溢出寫后,會對磁盤中這個map任務產(chǎn)生的所有臨時文件(spill文件
)進行歸并
(merge)操作生成最終的正式輸出文件虑粥,此時的歸并是將所有spill文件中的相同partition合并到一起如孝,并對各個partition中的數(shù)據(jù)再進行一次排序(sort)宪哩,生成key和對應的value-list娩贷,文件歸并時,如果溢寫文件數(shù)量超過參數(shù)min.num.spills.for.combine
的值(默認為3)時锁孟,可以再次進行合并
彬祖。至此,map端shuffle過程結(jié)束品抽,接下來等待reduce task來拉取數(shù)據(jù)储笑。對于reduce端的shuffle過程來說,reduce task在執(zhí)行之前的工作就是不斷地拉取當前job里每個map task的最終結(jié)果圆恤,然后對從不同地方拉取過來的數(shù)據(jù)不斷地做merge最后合并成一個分區(qū)相同的大文件突倍,然后對這個文件中的鍵值對按照key
進行sort排序
,排好序之后緊接著進行分組
盆昙,分組完成后才將整個文件
交給reduce task
處理
shuffle詳細流程
Map端shuffle
①分區(qū)partition
②寫入環(huán)形內(nèi)存緩沖區(qū)
③執(zhí)行溢出寫
- 排序sort(根據(jù)key)--->合并combiner--->生成溢出寫文件
④歸并merge(也涉及到key的sort及combiner)
① 分區(qū)Partition
在將map()
函數(shù)處理后得到的(key,value)
對寫入到緩沖區(qū)之前羽历,需要先進行分區(qū)
操作,這樣就能把map任務處理的結(jié)果發(fā)送給指定的reducer去執(zhí)行淡喜,從而達到負載均衡秕磷,避免數(shù)據(jù)傾斜。MapReduce提供默認的分區(qū)類(HashPartitioner)炼团,其核心代碼如下:
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
②寫入環(huán)形內(nèi)存緩沖區(qū)
因為頻繁的磁盤I/O操作會嚴重的降低效率澎嚣,因此“中間結(jié)果”不會立馬寫入磁盤,而是優(yōu)先存儲到map節(jié)點的“環(huán)形內(nèi)存緩沖區(qū)
”瘟芝,并做一些預排序
以提高效率易桃,當寫入的數(shù)據(jù)量達到預先設置的闕值后便會執(zhí)行一次I/O操作將數(shù)據(jù)寫入到磁盤。每個map任務
都會分配一個環(huán)形內(nèi)存緩沖區(qū)
锌俱,用于存儲map任務輸出的鍵值對(默認大小100
MB晤郑,mapreduce.task.io.sort.mb調(diào)整)以及對應的partition
,被緩沖的(key,value)對
已經(jīng)被序列化(為了寫入磁盤)
- 個人總結(jié):寫入緩沖區(qū)前,先partion->根據(jù)partion及key排序預排序后->寫入磁盤中
③執(zhí)行溢寫出
一旦緩沖區(qū)
內(nèi)容達到閾值
(mapreduce.map.io.sort.spill.percent,默認0.80
,或者80%)贩汉,就會會鎖定這80%的內(nèi)存驱富,并在每個分區(qū)
中對其中的鍵值
對按鍵進行sort排序
,具體是將數(shù)據(jù)按照partition和key
兩個關鍵字進行排序匹舞,排序結(jié)果為緩沖區(qū)內(nèi)的數(shù)據(jù)按照partition為單位聚集在一起褐鸥,同一個partition內(nèi)的數(shù)據(jù)按照key有序。排序完成后會創(chuàng)建一個溢出寫文件
(臨時文件)赐稽,然后開啟一個后臺線程
把這部分數(shù)據(jù)以一個臨時文件的方式溢出寫(spill
)到本地磁盤
中(如果客戶端自定義了Combiner
(相當于map階段的reduce)叫榕,則會在分區(qū)排序后到溢寫出前自動調(diào)用combiner
,將相同的key的value相加
姊舵,這樣的好處就是減少溢寫到磁盤的數(shù)據(jù)量晰绎。這個過程叫“合并
”)。剩余的20%的內(nèi)存在此期間可以繼續(xù)寫入map輸出的鍵值對括丁。溢出寫過程按輪詢方式將緩沖區(qū)中的內(nèi)容寫到mapreduce.cluster.local.dir屬性指定的目錄中
合并Combiner
如果指定了Combiner荞下,可能在兩個地方被調(diào)用:
1.當為作業(yè)設置Combiner類后,緩存溢出
線程將緩存存放到磁盤時史飞,就會調(diào)用尖昏;
2.緩存溢出的數(shù)量超過mapreduce.map.combine.minspills(默認3)時,在緩存溢出文件合并
的時候會調(diào)用
- 合并(Combine)和歸并(Merge)的區(qū)別:
兩個鍵值對<“a”,1>和<“a”,1>构资,如果合并抽诉,會得到<“a”,2>,如果歸并吐绵,會得到<“a”,<1,1>>
特殊情況:當數(shù)據(jù)量很小迹淌,達不到緩沖區(qū)闕值時,怎么處理己单?
對于這種情況唉窃,目前看到有兩種不一樣的說法:
①不會有寫臨時文件到磁盤的操作,也不會有后面的合并荷鼠。
②最終也會以臨時文件的形式存儲到本地磁盤
至于真實情況是怎么樣的句携,該文章的大佬說他也不清楚。允乐。矮嫉。
④歸并merge
當一個map task處理的數(shù)據(jù)很大,以至于超過緩沖區(qū)內(nèi)存時牍疏,就會生成多個spill文件蠢笋。此時就需要對同一個map任務產(chǎn)生的多個spill文件
進行歸并
生成最終的一個已分區(qū)且已排序的大文件。配置屬性mapreduce.task.io.sort.factor控制著一次最多能合并多少流鳞陨,默認值是10昨寞。這個過程包括排序和合并
(可選)瞻惋,歸并得到的文件內(nèi)鍵值對有可能擁有相同的key,這個過程如果client設置過Combiner援岩,也會合并相同的key值的鍵值對
(根據(jù)上面提到的combine的調(diào)用時機可知)歼狼。
溢出寫文件歸并完畢后,Map將刪除所有的臨時溢出寫文件享怀,并告知NodeManager任務已完成羽峰,只要其中一個MapTask完成,ReduceTask就開始復制它的輸出(Copy階段分區(qū)輸出文件通過http的方式提供給reducer)
壓縮
寫磁盤時壓縮map端的輸出添瓷,因為這樣會讓寫磁盤的速度更快梅屉,節(jié)約磁盤空間,并減少傳給reducer的數(shù)據(jù)量鳞贷。默認情況下坯汤,輸出是不壓縮的(將mapreduce.map.output.compress設置為true即可啟動)
Reduce端shuffle
①復制copy
②歸并merge
③reduce
結(jié)合下面這張圖可以直觀感受reduce端的shuffle過程
①復制copy
Reduce進程
啟動一些數(shù)據(jù)copy線程
,通過HTTP
方式請求MapTask所在的NodeManager以獲取輸出文件搀愧。
NodeManager需要為分區(qū)文件運行reduce任務惰聂。并且reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區(qū)文件。而每個map任務的完成時間可能不同妈橄,因此只要有一個任務完成庶近,reduce任務就開始復制其輸出
reduce任務有少量復制線程翁脆,因此能夠并行取得map輸出眷蚓。默認線程數(shù)為5,但這個默認值可以通過mapreduce.reduce.shuffle.parallelcopies屬性進行設置反番。
【Reducer如何知道自己應該處理哪些數(shù)據(jù)呢沙热?】
因為Map端進行partition的時候,實際上就相當于指定了每個Reducer要處理的數(shù)據(jù)(partition就對應了Reducer)罢缸,所以Reducer在拷貝數(shù)據(jù)的時候只需拷貝與自己對應的partition中的數(shù)據(jù)即可篙贸。每個Reducer會處理一個或者多個partition。
【reducer如何知道要從哪臺機器上去的map輸出呢枫疆?】
map任務完成后爵川,它們會使用心跳機制通知它們的application master、因此對于指定作業(yè)息楔,application master知道m(xù)ap輸出和主機位置之間的映射關系寝贡。reducer中的一個線程定期詢問master以便獲取map輸出主機的位置。知道獲得所有輸出位置
②歸并merge
Copy 過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中值依,這里的緩沖區(qū)大小要比 map 端的更為靈活圃泡,它基于 JVM 的 heap size 設置,因為 Shuffle 階段 Reducer 不運行愿险,所以應該把絕大部分的內(nèi)存都給 Shuffle 用颇蜡。
Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中,如果內(nèi)存緩沖區(qū)中能放得下這次數(shù)據(jù)的話就直接把數(shù)據(jù)寫到內(nèi)存中,即內(nèi)存到內(nèi)存merge风秤。Reduce要向每個Map去拖取數(shù)據(jù)鳖目,在內(nèi)存中每個Map對應一塊數(shù)據(jù),當內(nèi)存緩存區(qū)中存儲的Map數(shù)據(jù)占用空間達到一定程度的時候缤弦,開始啟動內(nèi)存中merge疑苔,把內(nèi)存中的數(shù)據(jù)merge輸出到磁盤上一個文件中,即內(nèi)存到磁盤merge甸鸟。與map端的溢寫類似惦费,在將buffer中多個map輸出合并寫入磁盤之前,如果設置了Combiner抢韭,則會化簡壓縮合并的map輸出薪贫。Reduce的內(nèi)存緩沖區(qū)可通過mapred.job.shuffle.input.buffer.percent配置,默認是JVM的heap size的70%刻恭。內(nèi)存到磁盤merge的啟動門限可以通過mapred.job.shuffle.merge.percent配置瞧省,默認是66%
當屬于該reducer的map輸出全部拷貝完成,則會在reducer上生成多個文件(如果拖取的所有map數(shù)據(jù)總量都沒有內(nèi)存緩沖區(qū)鳍贾,則數(shù)據(jù)就只存在于內(nèi)存中)鞍匾,這時開始執(zhí)行合并
操作,即磁盤到磁盤merge
骑科,Map的輸出數(shù)據(jù)已經(jīng)是有序的橡淑,Merge進行一次合并排序
,所謂Reduce端的sort過程就是這個合并
的過程咆爽,采取的排序方法跟map階段不同梁棠,因為每個map端傳過來的數(shù)據(jù)是排好序的,因此眾多排好序的map輸出文件在reduce端進行合并時采用的是歸并排序
(MAP端shaffer 是堆排序)斗埂,針對鍵進行歸并排序符糊。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的呛凶。最終Reduce shuffle過程會輸出一個整體有序的數(shù)據(jù)塊
③reduce
當一個reduce任務完成全部的復制和排序后男娄,就會針對已根據(jù)鍵排好序的Key構(gòu)造對應的Value迭代器。這時就要用到分組漾稀,默認的根據(jù)鍵分組模闲,自定義的可是使用 job.setGroupingComparatorClass()
方法設置分組函數(shù)類。對于默認分組來說县好,只要這個比較器比較的兩個Key相同围橡,它們就屬于同一組,它們的 Value就會放在一個Value迭代器
缕贡,而這個迭代器的Key使用屬于同一個組的所有Key的第一個Key翁授。
在reduce階段拣播,reduce()方法的輸入是所有的Key和它的Value迭代器。此階段的輸出直接寫到輸出文件系統(tǒng)收擦,一般為HDFS贮配。如果采用HDFS,由于NodeManager也運行數(shù)據(jù)節(jié)點塞赂,所以第一個塊副本將被寫到本地磁盤泪勒。
1、當reduce將所有的map上對應自己partition的數(shù)據(jù)下載完成后宴猾,reducetask真正進入reduce函數(shù)的計算階段圆存。由于reduce計算時同樣是需要內(nèi)存作為buffer,可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代碼MergeManagerImpl.java:674行)來設置reduce的緩存仇哆。
這個參數(shù)默認
情況下為0
沦辙,也就是說,reduce是全部從磁盤開始讀處理數(shù)據(jù)讹剔。如果這個參數(shù)大于0油讯,那么就會有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給reduce,當reduce計算邏輯消耗內(nèi)存很小時延欠,可以分一部分內(nèi)存用來緩存數(shù)據(jù)陌兑,可以提升計算的速度。所以默認情況下都是從磁盤讀取數(shù)據(jù)由捎,如果內(nèi)存足夠大的話兔综,務必設置該參數(shù)讓reduce直接從緩存讀數(shù)據(jù),這樣做就有點Spark Cache的感覺隅俘。
2邻奠、Reduce在這個階段,框架為已分組的輸入數(shù)據(jù)中的每個鍵值對對調(diào)用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法为居。Reduce任務的輸出通常是通過調(diào)用 OutputCollector.collect(WritableComparable,Writable)寫入文件系統(tǒng)的。