目錄
1. Shuffle的引入
2. Shuffle過(guò)程
3.Map端Shuffle
4. Reduce Shuffle
1. Shuffle的引入
Map 是映射多糠,負(fù)責(zé)數(shù)據(jù)的過(guò)濾分法工秩,將原始數(shù)據(jù)轉(zhuǎn)化為鍵值對(duì);
Reduce 是合并,將具有相同的 key 值的 value 進(jìn)行處理后再輸出新的鍵值對(duì)作為最終結(jié) 果。
為了讓 Reduce 可以并行處理 Map 的結(jié)果醇坝,必須對(duì) Map 的輸出進(jìn)行一定 的排序與分割扰柠,然后再交給對(duì)應(yīng)的 Reduce,Map 端的輸出作為 Reduce 的輸 入的過(guò)程叫做 Shuffle.
在了解shuffle的具體流程之前藐吮,應(yīng)先對(duì)以下兩個(gè)概念有所了解:
block塊(物理劃分)
block是HDFS中的基本存儲(chǔ)單位溺拱,默認(rèn)大小為64M。文件上傳到HDFS谣辞,就要?jiǎng)澐謹(jǐn)?shù)據(jù)成塊迫摔,這里的劃分屬于物理的劃分,塊的大小可通過(guò) dfs.block.size配置泥从。block采用冗余機(jī)制保證數(shù)據(jù)的安全:默認(rèn)為3份句占,可通過(guò)dfs.replication配置。
split分片(邏輯劃分)
Hadoop中split劃分屬于邏輯上的劃分躯嫉,目的只是為了讓map task更好地獲取數(shù)據(jù)纱烘。split是通過(guò)hadoop中的InputFormat接口中的getSplit()方法得到的杨拐。
2. Shuffle過(guò)程
如圖所示,這是Shuffle的整個(gè)流程擂啥,我們一步一步解釋:
Shuffle 過(guò)程包含在 Map 和 Reduce 兩端哄陶,即 Map shuffle 和 Reduce shuffle。
3. Map端shuffle
分區(qū)partition
寫(xiě)入環(huán)形內(nèi)存緩沖區(qū)
執(zhí)行溢出寫(xiě):排序sort--->合并combiner--->生成溢出寫(xiě)文件
歸并merge
(1)分區(qū)Partition
在將map()函數(shù)處理后得到的(key,value)對(duì)寫(xiě)入到緩沖區(qū)之前哺壶,需要先進(jìn)行分區(qū)操作屋吨,這樣就能把map任務(wù)處理的結(jié)果發(fā)送給指定的reducer去執(zhí)行,從而達(dá)到負(fù)載均衡山宾,避免數(shù)據(jù)傾斜至扰。MapReduce提供默認(rèn)的分區(qū)類(HashPartitioner)。
簡(jiǎn)單理解就是按照統(tǒng)計(jì)結(jié)果按照條件輸入到不同文件當(dāng)中(分區(qū))资锰。比如將手機(jī)號(hào)按照 135 137 187開(kāi)頭的分別放到一個(gè)獨(dú)立的文件當(dāng)中 敢课,其他的放到一個(gè)文件夾。
(2)寫(xiě)入環(huán)形內(nèi)存緩沖區(qū)
因?yàn)轭l繁的磁盤(pán)I/O操作會(huì)嚴(yán)重的降低效率台妆,因此“中間結(jié)果”不會(huì)立馬寫(xiě)入磁盤(pán)翎猛,而是優(yōu)先存儲(chǔ)到map節(jié)點(diǎn)的“環(huán)形內(nèi)存緩沖區(qū)”,并做一些預(yù)排序以提高效率接剩,當(dāng)寫(xiě)入的數(shù)據(jù)量達(dá)到預(yù)先設(shè)置的闕值后便會(huì)執(zhí)行一次I/O操作將數(shù)據(jù)寫(xiě)入到磁盤(pán)切厘。
這個(gè)閾值一般是0.8,也就是80MB懊缺,另外的20%內(nèi)存可以繼續(xù)寫(xiě)入要寫(xiě)進(jìn)磁盤(pán)的數(shù)據(jù)疫稿,寫(xiě)入磁盤(pán)和寫(xiě)入內(nèi)存操作是互不干擾的*,如果緩存區(qū)被撐滿了鹃两,那么map就會(huì)阻塞寫(xiě)入內(nèi)存的操作遗座,讓寫(xiě)入磁盤(pán)操作完成后再繼續(xù)執(zhí)行寫(xiě)入內(nèi)存操作
(3)執(zhí)行溢寫(xiě)出
一旦緩沖區(qū)內(nèi)容達(dá)到閾值,就會(huì)鎖定這80%的內(nèi)存俊扳,并在每個(gè)分區(qū)中對(duì)其中的鍵值對(duì)按鍵進(jìn)行sort排序
具體是將數(shù)據(jù)按照partition和key兩個(gè)關(guān)鍵字進(jìn)行排序途蒋,排序結(jié)果為緩沖區(qū)內(nèi)的數(shù)據(jù)按照partition為單位聚集在一起,同一個(gè)partition內(nèi)的數(shù)據(jù)按照key有序馋记。排序完成后會(huì)創(chuàng)建一個(gè)溢出寫(xiě)文件(臨時(shí)文件)号坡,然后開(kāi)啟一個(gè)后臺(tái)線程把這部分?jǐn)?shù)據(jù)以一個(gè)臨時(shí)文件的方式溢出寫(xiě)(spill)到本地磁盤(pán)中
如果客戶端自定義了Combiner(相當(dāng)于map階段的reduce),則會(huì)在分區(qū)排序后到溢寫(xiě)出前自動(dòng)調(diào)用combiner梯醒,將相同的key的value相加宽堆,這樣的好處就是減少溢寫(xiě)到磁盤(pán)的數(shù)據(jù)量。這個(gè)過(guò)程叫“合并”
剩余的20%的內(nèi)存在此期間可以繼續(xù)寫(xiě)入map輸出的鍵值對(duì)茸习。溢出寫(xiě)過(guò)程按輪詢方式將緩沖區(qū)中的內(nèi)容寫(xiě)到mapreduce.cluster.local.dir屬性指定的目錄中畜隶。
(4)歸并merge
當(dāng)一個(gè)map task處理的數(shù)據(jù)很大,以至于超過(guò)緩沖區(qū)內(nèi)存時(shí),就會(huì)生成多個(gè)spill文件籽慢。此時(shí)就需要對(duì)同一個(gè)map任務(wù)產(chǎn)生的多個(gè)spill文件進(jìn)行歸并生成最終的一個(gè)已分區(qū)且已排序的大文件浸遗。
歸并得到的文件內(nèi)鍵值對(duì)有可能擁有相同的key,這個(gè)過(guò)程如果client設(shè)置過(guò)Combiner嗡综,也會(huì)合并相同的key值的鍵值對(duì)(根據(jù)上面提到的combine的調(diào)用時(shí)機(jī)可知)乙帮。
(5)壓縮
寫(xiě)磁盤(pán)時(shí)壓縮map端的輸出,因?yàn)檫@樣會(huì)讓寫(xiě)磁盤(pán)的速度更快极景,節(jié)約磁盤(pán)空間察净,并減少傳給reducer的數(shù)據(jù)量。默認(rèn)情況下盼樟,輸出是不壓縮的
4. reduce shuffle
reduce shuffle 有復(fù)制和合并氢卡。
Reduce 通過(guò)*** Http 的方式從 map 獲取數(shù)據(jù)***, reduce 有少量的復(fù)制線程晨缴,可以并行的從 map 上復(fù)制數(shù)據(jù)译秦。
Reduce 可能需要 從多個(gè) map 任務(wù)中獲取數(shù)據(jù),因此只要多個(gè) map 中的一個(gè)完成击碗,reduce 便可 以從 map 復(fù)制數(shù)據(jù)筑悴。
如果 map 的輸出數(shù)據(jù)較小,會(huì)直接復(fù)制到內(nèi)存;如果數(shù) 據(jù)比較大稍途,當(dāng)達(dá)到緩沖區(qū)閾值時(shí)會(huì)溢出到磁盤(pán)阁吝,最后會(huì)排序合并這些溢出文件。