Scheduler模塊主要負責stage的劃分窥淆,以及job的調度及submit座云。是整個spark計算流程中比較重要的部分碎连。
?1.從saveAsTextFile開始
??閱讀該部分代碼可以從任意一個action方法開始训桶,例如saveAsTextFile方法:
一路跟下去直到PairRDDFunctions.saveAsHadoopDataset中有一行self.context.runJob(self,writeToFile)奄喂,如下:
該方法中入參writeToFile是用來定義stage執(zhí)行邏輯的函數(要注意的是scala里函數和方法是有區(qū)別的)婆赠,該函數運用了closure特性在實際運行過程中不斷的針對stage的各個partition信息(不同的partition輸出位置等屬性不同)來重新初始化自由變量writer绵脯。
而Utils.tryWithSafeFinallyAndFailureCallbacks方法則是一個被curry化的函數,封裝了固定的異常處理機制休里。
因為當前action是saveTextAsHadoopFile操作蛆挫,所以該函數的功能就是將當前拿到的partition的數據寫入到指定路徑。
2.DAGScheduler.submitJob
然后看runJob方法妙黍,從這里一路跟蹤到DAGScheduler.submitJob方法(在跟蹤過程中會看到對func序列化的操作)悴侵,然后會看到這里:
Spark的計算框架是基于Event隊列機制運作的,諸如job的submit拭嫁、cancel可免,Excecutor的添加、丟失等操作做粤。當需要執(zhí)行某操作時浇借,會向操作對應的EventLoop中發(fā)送Event,該Event會被添加至Queue中怕品,然后順序處理妇垢。如下為DAGSchedulerEventProcessLoop的消息處理邏輯:
3.DAGScheduler.handleJobSubmitted
點進去看到DAGScheduler. handleJobSubmitted方法。
在該方法中肉康,將當前正在運行的job添加至active中闯估,發(fā)送SparkListenerJobStart事件,用來監(jiān)控Job處理的進度吼和,在UI界面上展示涨薪。
4.DAGScheduler. submitStage
然后看submitStage方法,該方法是用來提交spark job的炫乓,提交時會從整個DAG圖的最后一個stage開始進行刚夺,逐個查找其parent stage献丑,直到找不到未執(zhí)行的parent stage后再開始執(zhí)行當前遞歸查找到的stage中的tasks。在stage查找其parent stages的過程中光督,會更新stage的狀態(tài)變更為waiting阳距、running、failed结借。下面具體分析一下:
首先系統(tǒng)會判斷入參stage是否目前為止還未被調度過(分為因parent stage missing而等待、執(zhí)行中卒茬、執(zhí)行失敗三種狀態(tài))船老。這里的missing應該是指未被系統(tǒng)檢測到也就是待計算的意思。因為查找是從DAG圖的最后一個stage開始的圃酵,在查找開始前其parent都是missing的柳畔。
如果是則通過getMissingParentStages獲取其missing的parent stage。getMissingParentStages稍后再解釋郭赐。
如果stage沒有parent薪韩,則說明當前DAG分支已經找到了source,這時候直接提交stage的task即可捌锭。提交task的方法為submitMissingTasks俘陷。
如果找到了parent,則依次將其parent全部提交观谦,然后遞歸查找其parent的parent stage拉盾,同時將當前stage添加到waitingStages贿肩,直到job完成或失敗后洲赵,stage會被-=掉,參考(DAGScheduler. markMapStageJobAsFinished方法)农渊。
submitStages方法執(zhí)行后泻红,會調用submitWaitingStages方法夭禽,將當前waiting的stages按照升序提交一下。
5.DAGScheduler. getMissingParentStages
下面看一下getMissingParentStages方法:
該方法用來查找一個stage的parent stage的谊路,也就是我們所說的劃分stage的邏輯讹躯。因為stage是由rdd組成,劃分stage是基于rdd之間的依賴關系是否為shuffleDependency(寬依賴)來決定的凶异。代碼中可以看到蜀撑,如果是寬依賴則構建一個stage,如果是窄依賴剩彬,則繼續(xù)向上查找酷麦。
在該方法中構建了一個stack waitingForVisit,用來存儲當前迭代到的不是shuffleDependency的rdd喉恋。
將當前stage的rdd push到這個stack中沃饶,調用visit方法判斷其Dependency類型母廷。Visit方法首先會判斷這個rdd是否已經計算完畢,判斷依據為該rdd的partition是否都有了輸出糊肤。
如果沒有計算完成琴昆,則判斷其Dependency類型,如果為寬依賴馆揉,則封裝一個stage添加到missing中业舍。如果未窄依賴,則獲取該窄依賴的rdd升酣,將其push到stack中舷暮,待下一次繼續(xù)查找。
直到最終返回missing噩茄。