Facebook 60TB+級(jí)的Apache Spark應(yīng)用案例 里大體有兩方面的PR,一個(gè)是Bug Fix麻汰,一個(gè)是性能優(yōu)化坯钦。這篇文章會(huì)對(duì)所有提及的Bug Issue進(jìn)行一次解釋和說(shuō)明真仲。也請(qǐng)期待下一篇。
前言
Facebook 60TB+級(jí)的Apache Spark應(yīng)用案例,本來(lái)上周就準(zhǔn)備看的戳护,而且要求自己不能手機(jī)看金抡,要在電腦上細(xì)細(xì)的看。然而終究是各種忙拖到了昨天晚上腌且。
文章體現(xiàn)的工作梗肝,我覺得更像是一次挑戰(zhàn)賽,F(xiàn)acebook團(tuán)隊(duì)通過(guò)層層加碼铺董,最終將單個(gè)Spark Batch實(shí)例跑到了60T+ 的數(shù)據(jù)巫击,這是一個(gè)了不起的成就,最最重要的是精续,他們完成這項(xiàng)挑戰(zhàn)賽后給社區(qū)帶來(lái)了三個(gè)好處:
- 在如此規(guī)模下坝锰,發(fā)現(xiàn)了一些Spark團(tuán)隊(duì)以前很難發(fā)現(xiàn)的Bug
- 提交了大量的bug fix 和 new features,而且我們可以在Spark 1.6.2 /Spark 2.0 里享受到其中的成果
- 在如此規(guī)模下,我們也知道我們最可能遇到的一些問(wèn)題重付。大體是OOM和Driver的限制顷级。
說(shuō)實(shí)在的,我覺得這篇文章堪夭,可以算是一篇工程論文了愕把。而且只用了三個(gè)人力,不知道一共花了多久森爽。
值得注意的是恨豁,大部分Bug都是和OOM相關(guān)的,這也是Spark的一個(gè)痛點(diǎn)爬迟,所以這次提交的PR質(zhì)量非常高橘蜜。
Bug 剖析
Make PipedRDD robust to fetch failure SPARK-13793
這個(gè)Issue 還是比較明顯的。PipedRDD 在Task內(nèi)部啟動(dòng)一個(gè)新的Java進(jìn)程(假設(shè)我們叫做ChildProcessor)獲取數(shù)據(jù)付呕。這里就會(huì)涉及到三個(gè)點(diǎn):
- 啟動(dòng)一個(gè)線程往 ChildProcessor 寫數(shù)據(jù) (stdin writer)
- 啟動(dòng)一個(gè)線程監(jiān)控ChildProcessor的錯(cuò)誤輸出 (stderr reader)
- 獲取ChildProcessor輸入流计福,返回一個(gè)迭代器(Iterator)
既然都是讀取數(shù)據(jù)流,如果數(shù)據(jù)流因?yàn)槟撤N異常原因關(guān)閉徽职,那必然會(huì)拋出錯(cuò)誤象颖。所以我們需要記錄這個(gè)異常,對(duì)于1,2 兩個(gè)我們只要catch住異常姆钉,然后將異常記錄下來(lái)方便后續(xù)重新拋出说订。 那么什么時(shí)候拋出呢?迭代器有經(jīng)典的hasNext/next方法,每次hasNext時(shí)潮瓶,我們都檢查下是否有Exception(來(lái)自1,2的)陶冷,如果有就拋出了。既然已經(jīng)異常了毯辅,我們就應(yīng)該不需要繼續(xù)讀取這個(gè)分區(qū)的數(shù)據(jù)了埂伦。否則數(shù)據(jù)集很大的情況下,還要運(yùn)行很長(zhǎng)時(shí)間才能運(yùn)行完思恐。
在hasNext 為false的情況下沾谜,有兩類情況膊毁,一類是真的沒有數(shù)據(jù)了,一類是有異常了类早,比如有節(jié)點(diǎn)掛了媚媒,所以需要檢測(cè)下ChildProcessor的exitStatus狀態(tài)。如果不正常涩僻,就直接拋出異常缭召,進(jìn)行重試。
對(duì)于1逆日,2兩點(diǎn)嵌巷,原來(lái)都是沒有的昭卓,是這次Facebook團(tuán)隊(duì)加上去的耕赘。
Configurable max number of fetch failures SPARK-13369
截止到我這篇文章發(fā)出,這個(gè)Issue 并沒有被接收摊崭。
我們知道坪圾,Shuffle 發(fā)生時(shí)晓折,一般會(huì)發(fā)生有兩個(gè)Stage 產(chǎn)生,一個(gè)ShuffleMapStage (我們?nèi)∶麨?MapStage),他會(huì)寫入數(shù)據(jù)到文件中兽泄,接著下一個(gè)Stage (我們?nèi)∶麨镽educeStage) 就會(huì)去讀取對(duì)應(yīng)的數(shù)據(jù)漓概。 很多情況下,ReduceStage 去讀取數(shù)據(jù)MapStage 的數(shù)據(jù)會(huì)失敗病梢,可能的原因比如有節(jié)點(diǎn)重啟導(dǎo)致MapStage產(chǎn)生的數(shù)據(jù)有丟失胃珍,此外還有GC超時(shí)等。這個(gè)時(shí)候Spark 就會(huì)重跑這兩個(gè)Stage,如果連續(xù)四次都發(fā)生這個(gè)問(wèn)題蜓陌,那么就會(huì)將整個(gè)Job給標(biāo)記為失敗觅彰。 現(xiàn)階段(包括在剛發(fā)布的2.0),這個(gè)數(shù)值是固定的钮热,并不能夠設(shè)置填抬。
@markhamstra 給出的質(zhì)疑是,如果發(fā)生節(jié)點(diǎn)失敗導(dǎo)致Stage 重新被Resubmit ,Resubmit后理論上不會(huì)再嘗試原來(lái)失敗的節(jié)點(diǎn)隧期,如果連續(xù)四次都無(wú)法找到正常的階段運(yùn)行這些任務(wù)痴奏,那么應(yīng)該是有Bug,簡(jiǎn)單增加重試次數(shù)雖然也有意義厌秒,但是治標(biāo)不治本。
我個(gè)人認(rèn)為在集群規(guī)模較大擅憔,任務(wù)較重的過(guò)程中鸵闪,出現(xiàn)一個(gè)或者一批Node 掛掉啥的是很正常的,如果僅僅是因?yàn)槟硞€(gè)Shuffle 導(dǎo)致整個(gè)Job失敗暑诸,對(duì)于那種大而耗時(shí)的任務(wù)顯然是不能接受的蚌讼。個(gè)人認(rèn)為應(yīng)該講這個(gè)決定權(quán)交給用戶辟灰,也就是允許用戶配置嘗試次數(shù)。
Unresponsive driver SPARK-13279
這個(gè)Bug已經(jīng)在1.6.1, 2.0.0 中修復(fù)篡石。 這個(gè)場(chǎng)景比較特殊芥喇,因?yàn)镕acebook產(chǎn)生了高達(dá)200k的task數(shù),原來(lái)給pendingTasksForExecutor:HashMap[String, ArrayBuffer[Int]]
添加新的task 的時(shí)候凰萨,都會(huì)根據(jù)Executor名獲取到已經(jīng)存在的列表继控,然后判斷該列表是否已經(jīng)包含了新Task,這個(gè)操作的時(shí)間復(fù)雜度是O(N^2)。在Task數(shù)比較小的情況下沒啥問(wèn)題胖眷,但是一旦task數(shù)達(dá)到了200k,基本就要五分鐘武通,給人的感覺就是Driver沒啥反應(yīng)了。
而且在實(shí)際運(yùn)行任務(wù)的過(guò)程中珊搀,會(huì)通過(guò)一個(gè)特殊的dequeueTaskFromList結(jié)構(gòu)來(lái)排除掉已經(jīng)運(yùn)行的任務(wù)冶忱,所以我們其實(shí)在addPendingTask 過(guò)程中不需要做這個(gè)檢測(cè)。
因?yàn)樽C明了沒有副作用境析,所以現(xiàn)在是沒啥問(wèn)題了囚枪。但是我個(gè)人認(rèn)為其實(shí)還有一種辦法是,取一個(gè)閾值劳淆,如果小于某個(gè)閾值則做double duplicate check,否則就直接加進(jìn)去就好了链沼。Spark 在很多地方也是這么做的。
這里對(duì)于那些Task數(shù)特別大的朋友有福了憔儿。
TimSort issue due to integer overflow for large buffer
該Bug在1.6.2, 2.0.0 已經(jīng)被解決忆植。這個(gè)bug引起的問(wèn)題現(xiàn)象初看起來(lái)會(huì)比較讓人費(fèi)解,大體如下:
如圖所示似乎違反了簽名谒臼。其實(shí)問(wèn)題本身確實(shí)比較復(fù)雜朝刊,通過(guò)提交了兩個(gè)patch 才解決了該問(wèn)題。
一開始Facebook的哥們覺得應(yīng)該是排序過(guò)程中內(nèi)存的數(shù)據(jù)(比如ShuffleExternalSorter等Sorter) 超過(guò)8G 引起的蜈缤,所以限制了數(shù)量拾氓,大于一定數(shù)量之后就進(jìn)行spill操作。 后面一個(gè)新的PR應(yīng)該是發(fā)現(xiàn)了問(wèn)題的根源底哥,在UnsafeSortDataFormat.copyRange() 和ShuffleSortDataFormat copyRange() 里咙鞍,里面數(shù)組的偏移量是Integer類型,雖然數(shù)據(jù)集的大小不至于超過(guò)Int的最大值趾徽,但是在特定數(shù)據(jù)分布下且數(shù)據(jù)集>268.43 million 并則會(huì)觸發(fā)這個(gè)Bug续滋。我看了下,原先 Platform.copyMemory 簽名本身也是Long的孵奶,但是實(shí)現(xiàn)copyRange的時(shí)候疲酌,默認(rèn)傳進(jìn)去的是Int,所以產(chǎn)生了這個(gè)問(wèn)題。大家瞅一眼代碼就知道了。
Fix Spark executor OOM
該Bug 也是在1.6.2, 2.0.0 被修正朗恳。
這個(gè)問(wèn)題是這樣的湿颅,Spark MemoryManager 可能認(rèn)為還有10M內(nèi)存,但是此時(shí)實(shí)際JVM可以提供給MemroyManager的內(nèi)存只有5M了粥诫。所以分配內(nèi)存的時(shí)候油航,就拋OOM了。這個(gè)時(shí)候應(yīng)該捕獲該OOM,并且保留已經(jīng)申請(qǐng)到內(nèi)存不歸還怀浆,讓MemoryManger 以為內(nèi)存不夠了谊囚,然后進(jìn)行splill操作,從而湊足需要的內(nèi)存揉稚。我們看TaskMemoryManager.allocatePage 方法秒啦。
如果發(fā)生OOM了,則會(huì)捕獲一次搀玖,,并且通過(guò)acquiredButNotUsed記住已經(jīng)申請(qǐng)的量余境,最后再次調(diào)用allocatePage。這個(gè)時(shí)候allocatePage里的acquireExecutionMemory 方法可能發(fā)現(xiàn)自己內(nèi)存不足了灌诅,就會(huì)發(fā)生spill了芳来,從而釋放出內(nèi)存。
其實(shí)這之前的代碼也考慮過(guò)猜拾,但是沒有在allocatePage的層次上做即舌。這個(gè)Bug估計(jì)在單個(gè)Executor 并行運(yùn)行Task數(shù)比較多的時(shí)候比較嚴(yán)重和容易發(fā)生的。
Fix memory leak in the sorter SPARK-14363
這個(gè)Bug 也是在1.6.2, 2.0.0被修正挎袜。
在Spark排序中顽聂,指針和數(shù)據(jù)時(shí)分開存儲(chǔ)的,進(jìn)行spill操作其實(shí)是把數(shù)據(jù)替換到磁盤上盯仪。但是指針數(shù)組是必須在內(nèi)存里紊搪。當(dāng)數(shù)據(jù)被spill后,相應(yīng)的全景,指向這些記錄的指針其實(shí)也是要被釋放的耀石。數(shù)據(jù)量很大的時(shí)候,指針數(shù)組的大小也很可觀爸黄。而且有一點(diǎn)值得指出的是滞伟,比如某個(gè)Executor 有五個(gè)Task并行運(yùn)行,如果其中有三個(gè)完成了炕贵,那么可用內(nèi)存增大梆奈,緩存到內(nèi)存的數(shù)據(jù)就會(huì)變多,這個(gè)時(shí)候剩下的兩個(gè)Task的指針數(shù)組也會(huì)增大称开,從而占用更多內(nèi)存鉴裹,接著新運(yùn)行的三個(gè)Task可用內(nèi)存變小了,從而失去了公平性。
這些各個(gè)Sorter里都需要修正径荔。
紅框部分便是釋放指針數(shù)組的地方。里面會(huì)重新按初始initialSize值申請(qǐng)一塊指針數(shù)組的內(nèi)存脆霎。