Spark shuffle是什么
Shuffle在Spark中即是把父RDD中的KV對按照Key重新分區(qū)强衡,從而得到一個新的RDD。也就是說原本同屬于父RDD同一個分區(qū)的數(shù)據(jù)需要進入到子RDD的不同的分區(qū)感挥。
現(xiàn)在的spark版本默認(rèn)使用的是sortshuffle;
shuffle在哪里產(chǎn)生
shuffle在spark的算子中產(chǎn)生,也就是運行task的時候才會產(chǎn)生shuffle.
sortShuffleManager
spark shuffle的默認(rèn)計算引擎叫sortshuffleManager,它負(fù)責(zé)shuffle過程的執(zhí)行越败、計算和組件的處理,sortshuffleManager會將task進行shuffle操作時產(chǎn)生的臨時磁盤文件合并成一個磁盤文件置谦,在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時亿傅,只要根據(jù)索引讀取每個磁盤文件中的部分?jǐn)?shù)據(jù)即可。
sortshuffle的內(nèi)部機制
1. 數(shù)據(jù)會根據(jù)不同的shuffle算子存儲到map數(shù)據(jù)結(jié)構(gòu)(如reduceByKey)或者array數(shù)據(jù)結(jié)構(gòu)(join);不過Map是一邊聚合,一邊寫入內(nèi)存,array是直接寫入內(nèi)存. 當(dāng)內(nèi)存達到一個閾值,就會溢出寫到磁盤,因此在溢出這個環(huán)節(jié)會在磁盤上產(chǎn)生多個臨時文件,磁盤上的這些文件需要合并,于是spark就有了merge機制.
2. 在溢寫到磁盤之前,在內(nèi)存中會按照key來排序,排序過后會進入到一個buffer緩沖區(qū),默認(rèn)為32K,緩沖區(qū)的batch默認(rèn)為1萬條key,也就是緩沖區(qū)以每次一萬條的量寫入到磁盤文件中,該緩沖區(qū)減少IO,提高性能. 緩沖區(qū)和寫入磁盤使用的技術(shù)是java中的BufferedOutputStream.
3. merge會將之前產(chǎn)生的所有的臨時文件進行合并,包括緩沖區(qū)讀寫到磁盤上的文件,合并成一個大的文件到磁盤,默認(rèn)為48M,與這個文件相對于的還有一個索引文件,索引文件里面記錄的是這個文件的元信息,且這個磁盤文件也是下游stage的Task的輸入信息! ?? 注: 一個下游的task對應(yīng)一個磁盤文件和這個磁盤文件的元信息. 于是就有了血統(tǒng),繼承之類的!
shuffle當(dāng)中可能會遇到的問題
1. 數(shù)據(jù)量非常大,從其他各臺機器收集數(shù)據(jù)占用大量網(wǎng)絡(luò)签餐。
2. 數(shù)據(jù)如何分類,即如何Partition氯檐,Hash冠摄、Sort等;
3. 負(fù)載均衡(數(shù)據(jù)傾斜)耗拓,因為采用不同的Shuffle方式對數(shù)據(jù)不同的分類,而分類之后又要跑到具體的節(jié)點上計算樟插,如果不恰當(dāng)?shù)脑捀偷螅苋菀桩a(chǎn)生數(shù)據(jù)傾斜;
4. 網(wǎng)絡(luò)傳輸效率鸵熟,需要在壓縮和解壓縮之間做出權(quán)衡负甸,序列化和反序列也是要考慮的問題;
說明:具體的Task進行計算的時候盡一切最大可能使得數(shù)據(jù)具備Process Locality的特性打月;退而求次是增加數(shù)據(jù)分片蚕捉,減少每個Task處理的數(shù)據(jù)量。
shuffle調(diào)優(yōu)
shuffle調(diào)優(yōu)分為兩種,一種是shuffle參數(shù)根據(jù)實際情況調(diào)優(yōu),一種是代碼開發(fā)調(diào)優(yōu),代碼開發(fā)調(diào)優(yōu)我在spark性能調(diào)優(yōu)里面去寫!
1. spark.shuffle.file.buffer(默認(rèn)值為32K,每次出貨1萬條)
該參數(shù)是緩沖區(qū)的緩沖內(nèi)存,如果可用的內(nèi)存資源較為充足的話,可以將緩沖區(qū)的值設(shè)置大點,這樣會較少磁盤IO次數(shù).,如果合理調(diào)節(jié)該參數(shù),性能會提升1%~5%...? 可以設(shè)置為64K.
2. spark.reducer.maxSizeInFlight(默認(rèn)為48M)
該參數(shù)是stage的每一個task就需要將上一個stage的計算結(jié)果中的所有相同key秘通,從各個節(jié)點上通過網(wǎng)絡(luò)都拉取到自己所在的節(jié)點上敛熬,然后進行key的聚合或連接等操作,如果合理調(diào)節(jié)該參數(shù)(增大),性能會提升1%~5%...
3. spark.shuffle.io.maxRetries(默認(rèn)3次)
該參數(shù)是stage的task向上一個stage的task計算結(jié)果拉取數(shù)據(jù),也就是上面那個操作,有時候會因為網(wǎng)絡(luò)異常原因,導(dǎo)致拉取失敗,失敗時候默認(rèn)重新拉取三次,三次過還是失敗的話作業(yè)就執(zhí)行失敗了,根據(jù)具體的業(yè)務(wù)可以考慮將默認(rèn)值增大,這樣可以避免由于JVM的一些原因或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗.也有助于提高spark作業(yè)的穩(wěn)定性. 可以適當(dāng)?shù)奶嵘匦吕〉拇螖?shù),最大為60次.
4. spark.shuffle.io.retryWait(默認(rèn)為5s)
該參數(shù)和上面一樣,是每次拉取數(shù)據(jù)的間隔時間...? 調(diào)優(yōu)建議:建議加大間隔時長(比如20s)应民,以增加shuffle操作的穩(wěn)定性
5. spark.shuffle.memoryFraction(默認(rèn)0.2,也就是20%)
該參數(shù)是數(shù)據(jù)根據(jù)不同的shuffle算子將數(shù)據(jù)寫入內(nèi)存結(jié)構(gòu)中,內(nèi)存結(jié)構(gòu)達到閾值會溢出臨時文件,這個參數(shù)就是則是內(nèi)存結(jié)構(gòu)的閾值百分比的,不是內(nèi)存結(jié)構(gòu)的內(nèi)存大小.? 如果內(nèi)存充足炸茧,而且很少使用持久化操作稿静,建議調(diào)高這個比例,可以減少頻繁對磁盤進行IO操作,合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右.
6. spark.shuffle.manager(默認(rèn)sort)
該參數(shù)是設(shè)置shuffle的類型,默認(rèn)是sort,也就是sortshuffleManager, hash參數(shù)對應(yīng)HashShuffleManager, tungsten-sort參數(shù)對應(yīng)tungsten(這個很少用),HashShuffleManager是以前的版本,這個默認(rèn)就行,
7. spark.shuffle.sort.bypassMergeThreshold(默認(rèn)200個)
該參數(shù)是如果shuffle read task的數(shù)量小于等于200個的時候,在sortshufflemanager模式下,就會啟動ByPass sortshufflemanager...這個調(diào)優(yōu)就這樣把 ,默認(rèn)200挺好的.
8. spark.shuffle.consolidateFiles(默認(rèn)為false)
該參數(shù)只對HashshuffleManager有效,而HashshuffleManager是spark1.2之前默認(rèn)使用的版本...