1. 前言
本文主要講述一下我對spark任務(wù)調(diào)度過程的理解叠纹,必要的地方會配合源碼。
2 . Spark任務(wù)調(diào)度
2.1 基本概念
- Job
用戶提交的spark應(yīng)用程序中任何一個action操作(foreach,count...)都對應(yīng)一個job的提交(提交job動作對應(yīng)到源碼SparkContext #runJob方)与涡,因此一個用戶spark應(yīng)用程序往往對應(yīng)一到多個job。比如下面的例子:def main(args:Array[String]){ val sparkConf = new SparkConf().setAppName("Log Query") val sc = new SparkContext(sparkConf) val lines = sc.textFile("README.md",3) val words = lines.flatMap(line => line.split(" ")) val wordOne = words.map(word => (word,1)) val wordCount = wordOne.reduceByKey(_ + _,3) // foreach是一個action豺鼻,對應(yīng)一個job wordCount.foreach(println) // collect是一個action儒飒,對應(yīng)一個job val resultAsArry = wordCount.collect() }
- Stage
Job提交之后檩奠,首先會被劃分為一到多個Stage,劃分Stage的原因在于一個job中有些操作(Transformation)是可以連在一起在同一個線程里執(zhí)行的井誉,這些連在一起的操作就像一根管道一樣,數(shù)據(jù)從順著管道流下去就行(比如.map.filter就可以連在一起)整胃,可是有些操作(shuffle操作颗圣,reduce在岂,group等)會導(dǎo)致管道出現(xiàn)分支,數(shù)據(jù)不得不分流到不同管道蛮寂,Stage的劃分就以這中會導(dǎo)致分流(shuffle)的操作為分割,劃分成不同的Stage及老,顯然劃分會導(dǎo)致Stage的依賴范抓,上游Stage必須運行完,才能讓下游Stage運行叠蝇。Stage和Job一樣是一種靜態(tài)的東西年缎,一個Stage里包含沒有Shuffle依賴(也就是沒有分流)的一連串RDD铃慷。真正提交運行的是Task。 - Task
Task是依據(jù)Stage建立起來的洲鸠,上面說Stage包含了一連串RDD,RDD是一種數(shù)據(jù)的抽象描述绢淀,對應(yīng)物理數(shù)據(jù)是包含了n個分區(qū)數(shù)據(jù)的。每一個Task就處理一個分區(qū)數(shù)據(jù)皆的,一個包含了n個分區(qū)的Stage就會創(chuàng)建出n個Task蹋盆,只有這n個Task都執(zhí)行成功了,這個Stage才算成功楞抡,然后才可以執(zhí)行下游的Stage。 - TaskSet
TaskSet是task的集合召廷,包含了同一個Stage中的部分或者全部task账胧,每次提交的是TaskSet,然后根據(jù)TaskSet創(chuàng)建TaskSetManager梗顺,spark中TaskSetManager是任務(wù)調(diào)度器一個調(diào)度單元寺谤,當(dāng)一個TaskSetManager被調(diào)度器調(diào)度到時,就會從TaskSetManager中拿若干個task去執(zhí)行变屁。task會失敗重試意狠,重試的那些task又會組成一個新的TaskSetManager去讓調(diào)度器調(diào)度,因此闷板,一個正在運行的Stage可能會有多個TaskSetManager正在等待調(diào)度院塞。 - TaskSchedulerImpl
任務(wù)調(diào)度器,它按照一定的策略調(diào)度TaskSetManager拦止,然后會從被調(diào)度到的TaskSetManager獲取若干個task發(fā)送到Executor去執(zhí)行。只要TaskSetManager中有task沒有運行完萧求,那么這個TaskSetManager還是會繼續(xù)被調(diào)度。目前有兩種調(diào)度策略:FIFO和Fair模式元旬。 - CoarseGrainedSchedulerBackend
運行在driver端秒梳,可以當(dāng)作是Rpc的一個端點,從任務(wù)調(diào)度器獲取任務(wù)并發(fā)送到Executor上執(zhí)行(LaunchTask)酪碘,以及接收Executor匯報的Task運行狀態(tài)信息(StatusUpdate) - CoarseGrainedExecutorBackend
運行在Executor上兴垦,從CoarseGrainedSchedulerBackend上接收運行任務(wù)的請求(LaunchTask),任務(wù)運行結(jié)束后通過它向CoarseGrainedSchedulerBackend匯報任務(wù)狀態(tài)(StatusUpdate).
整個過程大致如下圖: