spark shuffle v2

spark shuffle

Shuffle就是對數(shù)據(jù)進(jìn)行重組煤辨,由于分布式計算的特性和要求家妆,在實(shí)現(xiàn)細(xì)節(jié)上更加繁瑣和復(fù)雜
在MapReduce框架钠龙,Shuffle是連接Map和Reduce之間的橋梁蝗拿,Map階段通過shuffle讀取數(shù)據(jù)并輸出到對應(yīng)的Reduce;而Reduce階段負(fù)責(zé)從Map端拉取數(shù)據(jù)并進(jìn)行計算样屠。在整個shuffle過程中穿撮,往往伴隨著大量的磁盤和網(wǎng)絡(luò)I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低痪欲。Spark也會有自己的shuffle實(shí)現(xiàn)過程悦穿。原文鏈接:https://blog.csdn.net/zhanglh046/article/details/78360762

總的來說,spark跟MR的shuffle并沒有多大區(qū)別业踢,都涉及到map(寫數(shù)據(jù)的階段)栗柒,跟reduce(讀數(shù)據(jù)階段)。

spark shuffle 執(zhí)行流程

本文通過源碼分析spark shuffle的執(zhí)行過程知举,以及相關(guān)參數(shù)的調(diào)優(yōu)瞬沦。

通過分析spark 提交的源碼太伊,我們可以知道,最終調(diào)用的是org.apache.spark.scheduler.TaskrunTask方法,而Task有2個子類逛钻,ShuffleMapTask(write(也可能存在先read后write,最后階段是write)相當(dāng)于MR中的Map階段)跟ResultTask(開始階段是read,相當(dāng)于MR中的Reduce階段)

image.png

shuffle write階段

image.png

查看ShuffleMapTask可以知道有3種Writer僚焦,這里我們只討論最常用的SortShuffleWriter

Shuffle write 的'HashMap' 跟'Array'

Shuffle write沒有使用常見的collection或者map绣的,而是用一個大數(shù)組,第一位存儲key欲账,key的下一位存儲value屡江,存儲的格式類都是K: (getPartition(key), key) V: value
其實(shí)現(xiàn)原理很簡單,開一個大 Object 數(shù)組赛不,藍(lán)色部分存儲 Key惩嘉,黑色部分存儲 Value。如下圖:

image.png

1.PartitionedAppendOnlyMap

PartitionedAppendOnlyMap extends AppendOnlyMap踢故, AppendOnlyMap的官方介紹是 A simple open hash table optimized for the append-only use case, where keys are never removed, but the value for each key may be changed文黎。意思是類似 HashMap,但沒有remove(key)方法殿较。

具體操作拿到數(shù)據(jù)后

  • 計算key的hash值的位置pos耸峭,2pos,如果2pos位置沒有數(shù)據(jù)淋纲,則在2pos的位置放入key劳闹,2pos +1 的位置放入value,如果2*pos上有值洽瞬,計算key是否等于本涕,如果等于,則用傳入的函數(shù)更新伙窃,如wordcount中的reduceByKey(_ + _)菩颖, 計算出新的value后更新,如果不等于为障,則通過(pos + delta) & mask 的方法重新計算hash值得位置晦闰,delta 從1開始,遇到key存在每次遞增1
  • 當(dāng)容量>growThreshold(0.7 * size)鳍怨,就是大于70%鹅髓,數(shù)組會擴(kuò)容,變?yōu)樵瓉淼?倍京景,然后重新計算原數(shù)據(jù)得每個值窿冯,寫入到新的數(shù)組中。
  • 每次插入后确徙,會判斷當(dāng)前大約容量醒串,通過估算得方式計算占用的內(nèi)存执桌,每32次估算一次,如果大于當(dāng)前的內(nèi)存芜赌,就會向taskMemoryManager.acquireExecutionMemory申請內(nèi)存仰挣,如果申請成功,則繼續(xù)寫入缠沈,如果寫入不成功膘壶,則spill磁盤,所以洲愤,第一個優(yōu)化點(diǎn)颓芭,理論上executor內(nèi)存越大,在內(nèi)存可存儲的數(shù)據(jù)越多柬赐,spill磁盤的次數(shù)越少亡问,速度越快。spill的過程肛宋,調(diào)用collection.destructiveSortedWritablePartitionedIterator(comparator)州藕,首先會將數(shù)據(jù)往前移動,填滿中間空缺的位置酝陈,然后將內(nèi)存中的數(shù)據(jù)進(jìn)行排序床玻,用的排序算法是TimSort,最后按照分區(qū)且排序的形式寫入文件中沉帮。

2.PartitionedPairBuffer

相對比較簡單笨枯,不需要mapCombine,只需要將數(shù)據(jù)按照kv追加到數(shù)組后面遇西,如下圖馅精。


image.png

spill溢寫磁盤與PartitionedAppendOnlyMap一樣,不過不需要移動數(shù)據(jù)粱檀,填充空缺的位置洲敢,數(shù)據(jù)本身就是緊密的。

Shuffle write 合并

通過PartitionedAppendOnlyMap或者PartitionedPairBuffer操作完所有的數(shù)據(jù)后茄蚯,會生成一個內(nèi)存collection和0個或者多個分區(qū)且排序的文件(如果數(shù)據(jù)量過大有spill操作)压彭,最后通過外排將內(nèi)存的數(shù)據(jù)跟spill的文件數(shù)據(jù),通過merge sort合并成1個分區(qū)且排序的大文件(shuffleId_mapId_0.data)渗常,跟一個索引文件(shuffleId_mapId_0.index),類似kafka里的segment跟.index文件壮不。索引主要描述每個分區(qū)對應(yīng)的數(shù)據(jù),比如0-100是0號分區(qū)皱碘,101-200是1號分區(qū)的數(shù)據(jù)询一,為了給reducer fetch對應(yīng)分區(qū)的數(shù)據(jù)。

shuffle read階段

shuffle read階段,其實(shí)就是讀取數(shù)據(jù)的階段健蕊,你可以理解成菱阵,client向server發(fā)送請求,下載數(shù)據(jù)缩功。主要是從client讀取數(shù)據(jù)的過程晴及,超時、并發(fā)度嫡锌、異常重試等方面入手虑稼,server端則通過調(diào)整處理的并發(fā)數(shù)方面入手。

Shuffle read過程

ResultTask調(diào)用runTask最終調(diào)用的是ShuffleRdd的compute方法势木,然后我們可以看到實(shí)際上是BlockStoreShuffleReader的read方法蛛倦。read方法中,通過new ShuffleBlockFetcherIterator(), 注意這里有4個優(yōu)化參數(shù), new ShuffleBlockFetcherIterator的時候跟压,通過mapOutputTracker獲取屬于自己的Iterator[(BlockManagerId, Seq[(BlockId, Long)])]胰蝠,然后ShuffleBlockFetcherIterator的initialize方法歼培,是整個read切分讀request的邏輯震蒋。

new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      serializerManager.wrapStream,
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
      SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
      SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
      SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

ShuffleBlockFetcherIterator.initialize

首先將val targetRequestSize = math.max(maxBytesInFlight / 5, 1L),默認(rèn)是48m/5=9.6m躲庄。判斷拉取的數(shù)據(jù)是否大于9.6m或者一個address拉取的blocks數(shù)大于maxBlocksInFlightPerAddress(默認(rèn)是Int.MaxValue),所以只由9.6m控制查剖,如果是,則封裝成一個new FetchRequest(address, curBlocks)噪窘,最后會封裝成N個FetchRequest笋庄。然后開始遍歷拉數(shù)據(jù),判斷是否isRemoteBlockFetchable倔监,邏輯是

    def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
      fetchReqQueue.nonEmpty &&
        (bytesInFlight == 0 ||
          (reqsInFlight + 1 <= maxReqsInFlight &&
            bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
    }

總之就是正在拉取的數(shù)據(jù)不能大于spark.reducer.maxSizeInFlight(默認(rèn)48m)并且請求數(shù)不能超過maxReqsInFlight(Int.MaxValue),不然就進(jìn)入等待的deferredFetchRequests隊列直砂。所以,為了提高shuffle read的request并發(fā)讀的數(shù)量浩习,可以提高maxBytesInFlight(默認(rèn)48m)的大小静暂。并且單個address拉取的blocks數(shù)不能超過maxBlocksInFlightPerAddress(默認(rèn)是Int.MaxValue),所以谱秽,降低maxBlocksInFlightPerAddress可以降低同時拉取的blocks數(shù)量洽蛀,防止同時拉取多個blocks導(dǎo)致io過高,導(dǎo)致服務(wù)無響應(yīng)疟赊、io超時等異常郊供。
最終執(zhí)行sendRequest請求拉取數(shù)據(jù),調(diào)優(yōu)參數(shù)spark.shuffle.io.maxRetries(默認(rèn)是3)拉取失敗重試的次數(shù)近哟, spark.shuffle.io.retryWait(默認(rèn)是5秒)失敗后等待5秒后嘗試重新拉取數(shù)據(jù)驮审。

spark shuffle 參數(shù)調(diào)優(yōu)

為了進(jìn)一步優(yōu)化內(nèi)存的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開辟空間头岔,存儲經(jīng)過序列化的二進(jìn)制數(shù)據(jù)塔拳。除了沒有other空間,堆外內(nèi)存與堆內(nèi)內(nèi)存的劃分方式相同峡竣,所有運(yùn)行中的并發(fā)任務(wù)共享存儲內(nèi)存和執(zhí)行內(nèi)存靠抑。所以,開啟堆外內(nèi)存對于調(diào)優(yōu)非常重要

  • spark.shuffle.io.preferDirectBufs:是否優(yōu)先使用堆外內(nèi)存
  • spark.memory.offHeap.enabled: 是否啟用堆外內(nèi)存
  • spark.memory.offHeap.size: 設(shè)置堆外內(nèi)存大小

shuffle write階段參數(shù)調(diào)優(yōu)

  • spark.executor.memory:通過分析write的過程中可以知道适掰,單個task可用的內(nèi)存越大颂碧,可申請的內(nèi)存越大,spill disk的次數(shù)越少类浪,速度越快载城,所以,可以適當(dāng)提高該參數(shù)
  • spark.sql.shuffle.partitions:提高并行度可以減少單個task處理的數(shù)據(jù)量费就,減少spill disk次數(shù)诉瓦,降低oom風(fēng)險,但是并不是越大越好力细,提高該參數(shù)睬澡,會增加task的數(shù)量,跟線程的數(shù)量一個道理眠蚂,到達(dá)一定閾值煞聪,線程數(shù)的越多反而會增加系統(tǒng)上下文切換的壓力,需要一點(diǎn)點(diǎn)測試逝慧,根據(jù)不同的任務(wù)昔脯,確定具體的數(shù)據(jù)

shuffle read階段參數(shù)調(diào)優(yōu)

  • spark.reducer.maxSizeInFlight:默認(rèn)是48m,一個請求拉取一個塊的數(shù)據(jù)為48/5=9.6m,理想情況下會有5個請求同時拉數(shù)據(jù)笛臣,但是可能遇到一個大塊云稚,超過48m,就只有一個請求在拉數(shù)據(jù)沈堡,無法并行静陈,所以可用適當(dāng)提高該參數(shù)
  • spark.reducer.maxReqsInFlight:shuffle read的時候最多有多少個請求同時拉取數(shù)據(jù),默認(rèn)是Integer.MAX_VALUE踱蛀,一般不優(yōu)化窿给,不修改
  • spark.reducer.maxBlocksInFlightPerAddress: 一個拉取的請求,包含多少個server率拒,默認(rèn)一個請求是9.6m崩泡,但是可能每個server拉取的文件非常小,只有幾k猬膨,那樣一個請求就需要請求上千個server拉取數(shù)據(jù)角撞,容易導(dǎo)致超時等異常呛伴,所以,適當(dāng)降低該參數(shù)
  • spark.reducer.maxReqSizeShuffleToMem:read 過程中內(nèi)存可以存放最大的數(shù)據(jù)量谒所,超過將會把拉取的數(shù)據(jù)放到磁盤
  • spark.shuffle.io.maxRetries:一個請求拉取失敗時重試次數(shù)热康,增大該參數(shù),可能會延遲任務(wù)執(zhí)行時間劣领,但是可以提高任務(wù)成功率
  • spark.shuffle.io.retryWait:一個請求拉取失敗時的等待時間姐军,增大該參數(shù),可能會延遲任務(wù)執(zhí)行時間尖淘,但是可以提高任務(wù)成功率
  • spark.shuffle.io.clientThreads: 拉取數(shù)據(jù)client的線程個數(shù), 可適當(dāng)調(diào)高
  • spark.shuffle.file.buffer: write spill磁盤的時候奕锌,緩沖區(qū)大小

ExternalShuffleService

Spark 的 Executor 節(jié)點(diǎn)不僅負(fù)責(zé)數(shù)據(jù)的計算,還涉及到數(shù)據(jù)的管理村生。如果發(fā)生了 shuffle 操作惊暴,Executor 節(jié)點(diǎn)不僅需要生成 shuffle 數(shù)據(jù),還需要負(fù)責(zé)處理讀取請求趁桃。如果 一個 Executor 節(jié)點(diǎn)掛掉了辽话,那么它也就無法處理 shuffle 的數(shù)據(jù)讀取請求了,它之前生成的數(shù)據(jù)都沒有意義了卫病。

為了解耦數(shù)據(jù)計算和數(shù)據(jù)讀取服務(wù)油啤,Spark 支持單獨(dú)的服務(wù)來處理讀取請求。這個單獨(dú)的服務(wù)叫做 ExternalShuffleService忽肛,運(yùn)行在每臺主機(jī)上村砂,管理該主機(jī)的所有 Executor 節(jié)點(diǎn)生成的 shuffle 數(shù)據(jù)烂斋。有讀者可能會想到性能問題屹逛,因?yàn)橹笆怯啥鄠€ Executor 負(fù)責(zé)處理讀取請求,而現(xiàn)在一臺主機(jī)只有一個 ExternalShuffleService 處理請求汛骂,其實(shí)性能問題不必?fù)?dān)心罕模,因?yàn)樗饕拇疟P和網(wǎng)絡(luò),而且采用的是異步讀取帘瞭,所以并不會有性能影響淑掌。

解耦之后,如果 Executor 在數(shù)據(jù)計算時不小心掛掉蝶念,也不會影響 shuffle 數(shù)據(jù)的讀取抛腕。而且Spark 還可以實(shí)現(xiàn)動態(tài)分配,動態(tài)分配是指空閑的 Executor 可以及時釋放掉媒殉。


image.png

ExternalShuffleService參數(shù)調(diào)優(yōu)

image.png

ExternalShuffleService本質(zhì)是一個基于Netty寫的Netty服務(wù)担敌,所以相關(guān)調(diào)優(yōu)就是對Netty參數(shù)的調(diào)優(yōu),主要有以下這些參數(shù)廷蓉,具體調(diào)整全封,需要根據(jù)實(shí)際情況做出相應(yīng)的調(diào)整,提高服務(wù)穩(wěn)定性。

// 服務(wù)啟動時處理請求的線程數(shù)刹悴,默認(rèn)是服務(wù)器的cores * 2
spark.shuffle.io.serverThreads
// ChannelOption.SO_RCVBUF,
spark.shuffle.io.receiveBuffer
// ChannelOption.SO_BACKLOG
spark.shuffle.io.backLog
// ChannelOption.SO_SNDBUF
spark.shuffle.io.sendBuffer

實(shí)際應(yīng)用的效果

image.png

T: 使用的內(nèi)存 1T=1024G
P: 配置spark.sql.shuffle.partitions行楞,1P=1000
C: cpu cores數(shù)量

參考鏈接
https://blog.csdn.net/zhanglh046/article/details/78360762
https://github.com/JerryLead/SparkInternals
https://www.cnblogs.com/itboys/p/9201750.html
https://www.dazhuanlan.com/2019/12/19/5dfb2a10d780d/
https://blog.csdn.net/pre_tender/article/details/101517789

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市土匀,隨后出現(xiàn)的幾起案子子房,更是在濱河造成了極大的恐慌,老刑警劉巖就轧,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件池颈,死亡現(xiàn)場離奇詭異,居然都是意外死亡钓丰,警方通過查閱死者的電腦和手機(jī)躯砰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來携丁,“玉大人琢歇,你說我怎么就攤上這事∶渭” “怎么了李茫?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長肥橙。 經(jīng)常有香客問我魄宏,道長,這世上最難降的妖魔是什么存筏? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任宠互,我火速辦了婚禮,結(jié)果婚禮上椭坚,老公的妹妹穿的比我還像新娘予跌。我一直安慰自己,他們只是感情好善茎,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布券册。 她就那樣靜靜地躺著,像睡著了一般垂涯。 火紅的嫁衣襯著肌膚如雪烁焙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天耕赘,我揣著相機(jī)與錄音骄蝇,去河邊找鬼。 笑死鞠苟,一個胖子當(dāng)著我的面吹牛乞榨,可吹牛的內(nèi)容都是我干的秽之。 我是一名探鬼主播,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼吃既,長吁一口氣:“原來是場噩夢啊……” “哼考榨!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鹦倚,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤河质,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后震叙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體掀鹅,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年媒楼,在試婚紗的時候發(fā)現(xiàn)自己被綠了乐尊。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡划址,死狀恐怖扔嵌,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情夺颤,我是刑警寧澤痢缎,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站世澜,受9級特大地震影響独旷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜寥裂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一嵌洼、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抚恒,春花似錦咱台、人聲如沸络拌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽春贸。三九已至混萝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間萍恕,已是汗流浹背逸嘀。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留允粤,地道東北人崭倘。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓惦银,卻偏偏與公主長得像蹦浦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容