spark shuffle
Shuffle就是對數(shù)據(jù)進(jìn)行重組煤辨,由于分布式計算的特性和要求家妆,在實(shí)現(xiàn)細(xì)節(jié)上更加繁瑣和復(fù)雜
在MapReduce框架钠龙,Shuffle是連接Map和Reduce之間的橋梁蝗拿,Map階段通過shuffle讀取數(shù)據(jù)并輸出到對應(yīng)的Reduce;而Reduce階段負(fù)責(zé)從Map端拉取數(shù)據(jù)并進(jìn)行計算样屠。在整個shuffle過程中穿撮,往往伴隨著大量的磁盤和網(wǎng)絡(luò)I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低痪欲。Spark也會有自己的shuffle實(shí)現(xiàn)過程悦穿。原文鏈接:https://blog.csdn.net/zhanglh046/article/details/78360762
總的來說,spark跟MR的shuffle并沒有多大區(qū)別业踢,都涉及到map(寫數(shù)據(jù)的階段)栗柒,跟reduce(讀數(shù)據(jù)階段)。
spark shuffle 執(zhí)行流程
本文通過源碼分析spark shuffle的執(zhí)行過程知举,以及相關(guān)參數(shù)的調(diào)優(yōu)瞬沦。
通過分析spark 提交的源碼太伊,我們可以知道,最終調(diào)用的是org.apache.spark.scheduler.Task
的runTask
方法,而Task有2個子類逛钻,ShuffleMapTask
(write(也可能存在先read后write,最后階段是write)相當(dāng)于MR中的Map階段)跟ResultTask
(開始階段是read,相當(dāng)于MR中的Reduce階段)
shuffle write階段
查看ShuffleMapTask
可以知道有3種Writer僚焦,這里我們只討論最常用的SortShuffleWriter
。
Shuffle write 的'HashMap' 跟'Array'
Shuffle write沒有使用常見的collection或者map绣的,而是用一個大數(shù)組,第一位存儲key欲账,key的下一位存儲value屡江,存儲的格式類都是K: (getPartition(key), key) V: value
其實(shí)現(xiàn)原理很簡單,開一個大 Object 數(shù)組赛不,藍(lán)色部分存儲 Key惩嘉,黑色部分存儲 Value。如下圖:
1.PartitionedAppendOnlyMap
PartitionedAppendOnlyMap extends AppendOnlyMap踢故, AppendOnlyMap的官方介紹是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed文黎。意思是類似 HashMap,但沒有remove(key)方法殿较。
具體操作拿到數(shù)據(jù)后
- 計算key的hash值的位置pos耸峭,2pos,如果2pos位置沒有數(shù)據(jù)淋纲,則在2pos的位置放入key劳闹,2pos +1 的位置放入value,如果2*pos上有值洽瞬,計算key是否等于本涕,如果等于,則用傳入的函數(shù)更新伙窃,如wordcount中的reduceByKey(_ + _)菩颖, 計算出新的value后更新,如果不等于为障,則通過(pos + delta) & mask 的方法重新計算hash值得位置晦闰,delta 從1開始,遇到key存在每次遞增1
- 當(dāng)容量>growThreshold(0.7 * size)鳍怨,就是大于70%鹅髓,數(shù)組會擴(kuò)容,變?yōu)樵瓉淼?倍京景,然后重新計算原數(shù)據(jù)得每個值窿冯,寫入到新的數(shù)組中。
- 每次插入后确徙,會判斷當(dāng)前大約容量醒串,通過估算得方式計算占用的內(nèi)存执桌,每32次估算一次,如果大于當(dāng)前的內(nèi)存芜赌,就會向taskMemoryManager.acquireExecutionMemory申請內(nèi)存仰挣,如果申請成功,則繼續(xù)寫入缠沈,如果寫入不成功膘壶,則spill磁盤,所以洲愤,第一個優(yōu)化點(diǎn)颓芭,理論上executor內(nèi)存越大,在內(nèi)存可存儲的數(shù)據(jù)越多柬赐,spill磁盤的次數(shù)越少亡问,速度越快。spill的過程肛宋,調(diào)用collection.destructiveSortedWritablePartitionedIterator(comparator)州藕,首先會將數(shù)據(jù)往前移動,填滿中間空缺的位置酝陈,然后將內(nèi)存中的數(shù)據(jù)進(jìn)行排序床玻,用的排序算法是TimSort,最后按照分區(qū)且排序的形式寫入文件中沉帮。
2.PartitionedPairBuffer
相對比較簡單笨枯,不需要mapCombine,只需要將數(shù)據(jù)按照kv追加到數(shù)組后面遇西,如下圖馅精。
spill溢寫磁盤與PartitionedAppendOnlyMap一樣,不過不需要移動數(shù)據(jù)粱檀,填充空缺的位置洲敢,數(shù)據(jù)本身就是緊密的。
Shuffle write 合并
通過PartitionedAppendOnlyMap或者PartitionedPairBuffer操作完所有的數(shù)據(jù)后茄蚯,會生成一個內(nèi)存collection和0個或者多個分區(qū)且排序的文件(如果數(shù)據(jù)量過大有spill操作)压彭,最后通過外排將內(nèi)存的數(shù)據(jù)跟spill的文件數(shù)據(jù),通過merge sort合并成1個分區(qū)且排序的大文件(shuffleId_mapId_0.data)渗常,跟一個索引文件(shuffleId_mapId_0.index),類似kafka里的segment跟.index文件壮不。索引主要描述每個分區(qū)對應(yīng)的數(shù)據(jù),比如0-100是0號分區(qū)皱碘,101-200是1號分區(qū)的數(shù)據(jù)询一,為了給reducer fetch對應(yīng)分區(qū)的數(shù)據(jù)。
shuffle read階段
shuffle read階段,其實(shí)就是讀取數(shù)據(jù)的階段健蕊,你可以理解成菱阵,client向server發(fā)送請求,下載數(shù)據(jù)缩功。主要是從client讀取數(shù)據(jù)的過程晴及,超時、并發(fā)度嫡锌、異常重試等方面入手虑稼,server端則通過調(diào)整處理的并發(fā)數(shù)方面入手。
Shuffle read過程
ResultTask調(diào)用runTask最終調(diào)用的是ShuffleRdd的compute方法势木,然后我們可以看到實(shí)際上是BlockStoreShuffleReader的read方法蛛倦。read方法中,通過new ShuffleBlockFetcherIterator(), 注意這里有4個優(yōu)化參數(shù), new ShuffleBlockFetcherIterator的時候跟压,通過mapOutputTracker獲取屬于自己的Iterator[(BlockManagerId, Seq[(BlockId, Long)])]胰蝠,然后ShuffleBlockFetcherIterator的initialize方法歼培,是整個read切分讀request的邏輯震蒋。
new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
ShuffleBlockFetcherIterator.initialize
首先將val targetRequestSize = math.max(maxBytesInFlight / 5, 1L),默認(rèn)是48m/5=9.6m躲庄。判斷拉取的數(shù)據(jù)是否大于9.6m或者一個address拉取的blocks數(shù)大于maxBlocksInFlightPerAddress(默認(rèn)是Int.MaxValue),所以只由9.6m控制查剖,如果是,則封裝成一個new FetchRequest(address, curBlocks)噪窘,最后會封裝成N個FetchRequest笋庄。然后開始遍歷拉數(shù)據(jù),判斷是否isRemoteBlockFetchable倔监,邏輯是
def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
fetchReqQueue.nonEmpty &&
(bytesInFlight == 0 ||
(reqsInFlight + 1 <= maxReqsInFlight &&
bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
}
總之就是正在拉取的數(shù)據(jù)不能大于spark.reducer.maxSizeInFlight(默認(rèn)48m)并且請求數(shù)不能超過maxReqsInFlight(Int.MaxValue),不然就進(jìn)入等待的deferredFetchRequests隊列直砂。所以,為了提高shuffle read的request并發(fā)讀的數(shù)量浩习,可以提高maxBytesInFlight(默認(rèn)48m)的大小静暂。并且單個address拉取的blocks數(shù)不能超過maxBlocksInFlightPerAddress(默認(rèn)是Int.MaxValue),所以谱秽,降低maxBlocksInFlightPerAddress可以降低同時拉取的blocks數(shù)量洽蛀,防止同時拉取多個blocks導(dǎo)致io過高,導(dǎo)致服務(wù)無響應(yīng)疟赊、io超時等異常郊供。
最終執(zhí)行sendRequest請求拉取數(shù)據(jù),調(diào)優(yōu)參數(shù)spark.shuffle.io.maxRetries(默認(rèn)是3)拉取失敗重試的次數(shù)近哟, spark.shuffle.io.retryWait(默認(rèn)是5秒)失敗后等待5秒后嘗試重新拉取數(shù)據(jù)驮审。
spark shuffle 參數(shù)調(diào)優(yōu)
為了進(jìn)一步優(yōu)化內(nèi)存的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開辟空間头岔,存儲經(jīng)過序列化的二進(jìn)制數(shù)據(jù)塔拳。除了沒有other空間,堆外內(nèi)存與堆內(nèi)內(nèi)存的劃分方式相同峡竣,所有運(yùn)行中的并發(fā)任務(wù)共享存儲內(nèi)存和執(zhí)行內(nèi)存靠抑。所以,開啟堆外內(nèi)存對于調(diào)優(yōu)非常重要
-
spark.shuffle.io.preferDirectBufs
:是否優(yōu)先使用堆外內(nèi)存 -
spark.memory.offHeap.enabled
: 是否啟用堆外內(nèi)存 -
spark.memory.offHeap.size
: 設(shè)置堆外內(nèi)存大小
shuffle write階段參數(shù)調(diào)優(yōu)
-
spark.executor.memory
:通過分析write的過程中可以知道适掰,單個task可用的內(nèi)存越大颂碧,可申請的內(nèi)存越大,spill disk的次數(shù)越少类浪,速度越快载城,所以,可以適當(dāng)提高該參數(shù) -
spark.sql.shuffle.partitions
:提高并行度可以減少單個task處理的數(shù)據(jù)量费就,減少spill disk次數(shù)诉瓦,降低oom風(fēng)險,但是并不是越大越好力细,提高該參數(shù)睬澡,會增加task的數(shù)量,跟線程的數(shù)量一個道理眠蚂,到達(dá)一定閾值煞聪,線程數(shù)的越多反而會增加系統(tǒng)上下文切換的壓力,需要一點(diǎn)點(diǎn)測試逝慧,根據(jù)不同的任務(wù)昔脯,確定具體的數(shù)據(jù)
shuffle read階段參數(shù)調(diào)優(yōu)
-
spark.reducer.maxSizeInFlight
:默認(rèn)是48m,一個請求拉取一個塊的數(shù)據(jù)為48/5=9.6m,理想情況下會有5個請求同時拉數(shù)據(jù)笛臣,但是可能遇到一個大塊云稚,超過48m,就只有一個請求在拉數(shù)據(jù)沈堡,無法并行静陈,所以可用適當(dāng)提高該參數(shù) -
spark.reducer.maxReqsInFlight
:shuffle read的時候最多有多少個請求同時拉取數(shù)據(jù),默認(rèn)是Integer.MAX_VALUE踱蛀,一般不優(yōu)化窿给,不修改 -
spark.reducer.maxBlocksInFlightPerAddress
: 一個拉取的請求,包含多少個server率拒,默認(rèn)一個請求是9.6m崩泡,但是可能每個server拉取的文件非常小,只有幾k猬膨,那樣一個請求就需要請求上千個server拉取數(shù)據(jù)角撞,容易導(dǎo)致超時等異常呛伴,所以,適當(dāng)降低該參數(shù) -
spark.reducer.maxReqSizeShuffleToMem
:read 過程中內(nèi)存可以存放最大的數(shù)據(jù)量谒所,超過將會把拉取的數(shù)據(jù)放到磁盤 -
spark.shuffle.io.maxRetries
:一個請求拉取失敗時重試次數(shù)热康,增大該參數(shù),可能會延遲任務(wù)執(zhí)行時間劣领,但是可以提高任務(wù)成功率 -
spark.shuffle.io.retryWait
:一個請求拉取失敗時的等待時間姐军,增大該參數(shù),可能會延遲任務(wù)執(zhí)行時間尖淘,但是可以提高任務(wù)成功率 -
spark.shuffle.io.clientThreads
: 拉取數(shù)據(jù)client的線程個數(shù), 可適當(dāng)調(diào)高 -
spark.shuffle.file.buffer
: write spill磁盤的時候奕锌,緩沖區(qū)大小
ExternalShuffleService
Spark 的 Executor 節(jié)點(diǎn)不僅負(fù)責(zé)數(shù)據(jù)的計算,還涉及到數(shù)據(jù)的管理村生。如果發(fā)生了 shuffle 操作惊暴,Executor 節(jié)點(diǎn)不僅需要生成 shuffle 數(shù)據(jù),還需要負(fù)責(zé)處理讀取請求趁桃。如果 一個 Executor 節(jié)點(diǎn)掛掉了辽话,那么它也就無法處理 shuffle 的數(shù)據(jù)讀取請求了,它之前生成的數(shù)據(jù)都沒有意義了卫病。
為了解耦數(shù)據(jù)計算和數(shù)據(jù)讀取服務(wù)油啤,Spark 支持單獨(dú)的服務(wù)來處理讀取請求。這個單獨(dú)的服務(wù)叫做 ExternalShuffleService忽肛,運(yùn)行在每臺主機(jī)上村砂,管理該主機(jī)的所有 Executor 節(jié)點(diǎn)生成的 shuffle 數(shù)據(jù)烂斋。有讀者可能會想到性能問題屹逛,因?yàn)橹笆怯啥鄠€ Executor 負(fù)責(zé)處理讀取請求,而現(xiàn)在一臺主機(jī)只有一個 ExternalShuffleService 處理請求汛骂,其實(shí)性能問題不必?fù)?dān)心罕模,因?yàn)樗饕拇疟P和網(wǎng)絡(luò),而且采用的是異步讀取帘瞭,所以并不會有性能影響淑掌。
解耦之后,如果 Executor 在數(shù)據(jù)計算時不小心掛掉蝶念,也不會影響 shuffle 數(shù)據(jù)的讀取抛腕。而且Spark 還可以實(shí)現(xiàn)動態(tài)分配,動態(tài)分配是指空閑的 Executor 可以及時釋放掉媒殉。
ExternalShuffleService參數(shù)調(diào)優(yōu)
ExternalShuffleService本質(zhì)是一個基于Netty寫的Netty服務(wù)担敌,所以相關(guān)調(diào)優(yōu)就是對Netty參數(shù)的調(diào)優(yōu),主要有以下這些參數(shù)廷蓉,具體調(diào)整全封,需要根據(jù)實(shí)際情況做出相應(yīng)的調(diào)整,提高服務(wù)穩(wěn)定性。
// 服務(wù)啟動時處理請求的線程數(shù)刹悴,默認(rèn)是服務(wù)器的cores * 2
spark.shuffle.io.serverThreads
// ChannelOption.SO_RCVBUF,
spark.shuffle.io.receiveBuffer
// ChannelOption.SO_BACKLOG
spark.shuffle.io.backLog
// ChannelOption.SO_SNDBUF
spark.shuffle.io.sendBuffer
實(shí)際應(yīng)用的效果
T: 使用的內(nèi)存 1T=1024G
P: 配置spark.sql.shuffle.partitions行楞,1P=1000
C: cpu cores數(shù)量
參考鏈接
https://blog.csdn.net/zhanglh046/article/details/78360762
https://github.com/JerryLead/SparkInternals
https://www.cnblogs.com/itboys/p/9201750.html
https://www.dazhuanlan.com/2019/12/19/5dfb2a10d780d/
https://blog.csdn.net/pre_tender/article/details/101517789