????????接著上一篇,我們接著來分析下一個非常重要的組建DAGScheduler的運(yùn)行原理是怎么實(shí)現(xiàn)的;通過之前對Spark的分析講解,我們的Spark作業(yè)是在遇到一個action算子以后并以此為界限,劃分出一個Job出來蛮拔,也就是在這個時候,Spark作業(yè)向集群提交一個Job任務(wù)痹升;下面我們看看源碼是如何實(shí)現(xiàn)的建炫;
????????通過在任何一個action操作的算子中追蹤發(fā)現(xiàn),最終提交一個Job是調(diào)用了SparkContext的runJob方法實(shí)現(xiàn)的疼蛾,在該方法中通過dagSchedualer.runJob()正式向集群提交一個Job任務(wù)肛跌,接下來重點(diǎn)來了,我們來看看DAGScheduler是如何對一個Job進(jìn)行stage劃分的;
這里通過eventProcessLoop對象將Job進(jìn)行提交,下面我們看看在eventProcessLoop中具體發(fā)生了什么;
1.首先,創(chuàng)建出與partition數(shù)量相等的task;
2.由觸發(fā)Job提交的那個RDD算子作為作為起點(diǎn)揉稚,創(chuàng)建第一個stage并命名為finalStage;
3.對于if條件成立的內(nèi)容牺荠,是針對于本地模式運(yùn)行的卫玖,我們主要來分析一下集群模式下的工作模式,在else邏輯中原朝,我們可以看到調(diào)用了submitStage的方法驯嘱,該方法就是實(shí)現(xiàn)stage劃分的重要實(shí)現(xiàn);
1.在該方法中我們調(diào)用了getMissingParentStages()方法喳坠,并將其RDD壓入一個棧中鞠评;
2.在這個方法中,首先彈棧獲得棧頂?shù)腞DD壕鹉,并使用循環(huán)反復(fù)調(diào)用當(dāng)前RDD所依賴的父RDD剃幌,并判斷其父RDD是寬依賴還是窄依賴;
3.如果是寬依賴晾浴,則創(chuàng)建一個新的stage负乡,并將其加入到missingStage緩存中;如果是窄依賴的話脊凰,則將當(dāng)前的RDD在壓入棧中抖棘;
4.如此往復(fù),直到一個stage遍歷完成狸涌;
5.運(yùn)行完以上動作之后切省,接著使用遞歸操作,重復(fù)調(diào)用submitStage()方法帕胆,直到?jīng)]有父Stage的時候朝捆,即方法返回結(jié)果為Nil的時候,開始調(diào)用submitMissingTask將一個stage(即一個Taskset)提交給TaskScheduler去懒豹;
總結(jié):至此芙盘,我們的DAGScheduler的stage劃分算法基本上就介紹完了,下篇文章我們來接著介紹當(dāng)一個Taskset被提交給TaskScheduler后歼捐,TaskScheduler是如何對一個Taskset集合中的每個Task進(jìn)行合理分配的何陆,即我們的Task分配算法是如何實(shí)現(xiàn)的,歡迎關(guān)注豹储。
如需轉(zhuǎn)載贷盲,請注明: