題記:
這是我的第一篇技術(shù)博文,寫得不好請多提意見纪吮。然后,感謝張志斌老師萎胰,畢業(yè)之前張老師幫助我解一些“神奇的bug”碾盟,現(xiàn)在畢業(yè)一個月,我終于自己開始解自己認為“神奇的bug”奥洼。
背景:
我需要在spark streaming上做一個窗口的統(tǒng)計功能巷疼,但是因為一些原因晚胡,不能利用window相關(guān)算子灵奖。于是,我在driver上保持了一個resultRDD估盘,在DStream內(nèi)不斷地去更新這個resultRDD瓷患,包括新信息的統(tǒng)計,和過期信息的剔除遣妥。
現(xiàn)象:
batchSize設(shè)置為1分鐘擅编,程序剛開始運行的一天內(nèi),每個batch的處理時間都是2秒以下箫踩,如下圖:
運行長時間之后爱态,監(jiān)控頁面如下:(忽略時間戳,為了截圖重啟了程序)
可以看到境钟,每個job都skip了大量的stage锦担,每個stage內(nèi),都skip了大量的task慨削。而且有一個很有意思的現(xiàn)象洞渔,skip的數(shù)量都是遞增的。而且缚态,從skip的數(shù)字上來看磁椒,也很有規(guī)律。
再注意 job內(nèi)stage的執(zhí)行時間玫芦,每個job有2個stage浆熔,加起來平均2 ~ 3秒。但此時桥帆,batch的處理時延已經(jīng)達到了20 ~ 30秒医增。
總結(jié)一下遇到的問題:我的streaming程序連續(xù)運行一周之后师郑,慢了一個數(shù)量級,但實際花費在執(zhí)行上的時間近似不變调窍。到這宝冕,我已經(jīng)認為是一個“神奇的bug”了。
debug:
嚴格的說邓萨,batch的處理時間 = 生成執(zhí)行計劃時間 + task調(diào)度時間 + 各個stage執(zhí)行時間
在我的場景中地梨,batch的處理時間遠高于stage執(zhí)行時間和。就說明缔恳,執(zhí)行計劃生成和task調(diào)度花費了大量時間宝剖。task調(diào)度是yarn負責(zé),開銷主要在分發(fā)策略和網(wǎng)絡(luò)開銷上歉甚,這部分不會太耗時万细。剩下就是執(zhí)行計劃生成了。
在spark中纸泄,執(zhí)行計劃是通過RDD的依賴關(guān)系來生成DAG赖钞,并以此來劃分stage生成執(zhí)行計劃,代碼就不貼了聘裁,大致就是根據(jù)RDD的依賴關(guān)系遞歸地深度優(yōu)先搜索雪营,終止條件就是某個RDD的依賴為空,也就是說搜索到源RDD衡便。
了解了DAG的生成原理之后献起,再回過頭來看文章開頭說的背景,我們來模擬一下DAG的生成镣陕,DStream.foreachRDD谴餐,開始計算,假設(shè)當前時間為 t呆抑,然后t時刻的resultRDD依賴t-1時刻的resultRDD岂嗓,t-1時刻resultRDD依賴于t-2時刻的resultRDD。理肺。
問題的根源找出來了摄闸,隨著時間的推移,依賴的層次越來越多妹萨。最終導(dǎo)致DAG的生成耗費了大量時間年枕。
要解決這個問題,就要清除掉resultRDD的依賴關(guān)系乎完,如何清除熏兄?
答案是 checkpoint
private[spark] def markCheckpointed(): Unit = {
clearDependencies()
partitions_ = null
deps = null
}
在checkpoint之后,spark會清空rdd的依賴。
至此摩桶,“神奇的bug”解決桥状。
至于前面提到的大量skip:DAG生成遍歷了rdd的整個歷史,但是在DAG具體的執(zhí)行過程中硝清,會發(fā)現(xiàn)某一些stage辅斟,task已經(jīng)被運算過,因此不會再次計算芦拿,這樣就產(chǎn)生了skip士飒。
最后,愿我的未來再不會覺得有“神奇的bug”蔗崎。