spark internal - 作業(yè)調(diào)度
作者:劉旭暉 Raymond 轉(zhuǎn)載請(qǐng)注明出處
Email:colorant at 163.com
BLOG:http://blog.csdn.net/colorant/
在Spark中作業(yè)調(diào)度的相關(guān)類最重要的就是DAGScheduler,DAGScheduler顧名思義就是基于DAG圖的Scheduler
DAG全稱 Directed Acyclic Graph,有向無(wú)環(huán)圖屯援。簡(jiǎn)單的來(lái)說(shuō),就是一個(gè)由頂點(diǎn)和有方向性的邊構(gòu)成的圖中希俩,從任意一個(gè)頂點(diǎn)出發(fā),沒(méi)有任何一條路徑會(huì)將其帶回到出發(fā)的頂點(diǎn)纲辽。
在作業(yè)調(diào)度系統(tǒng)中颜武,調(diào)度的基礎(chǔ)就在于判斷多個(gè)作業(yè)任務(wù)的依賴關(guān)系,這些任務(wù)之間可能存在多重的依賴關(guān)系文兑,也就是說(shuō)有些任務(wù)必須先獲得執(zhí)行盒刚,然后另外的相關(guān)依賴任務(wù)才能執(zhí)行,但是任務(wù)之間顯然不應(yīng)該出現(xiàn)任何直接或間接的循環(huán)依賴關(guān)系绿贞,所以本質(zhì)上這種關(guān)系適合用DAG有向無(wú)環(huán)圖來(lái)表示因块。
概括地描述DAGScheduler和TaskScheduler(關(guān)于TaskScheduler的相關(guān)細(xì)節(jié),在我之前的關(guān)于Spark運(yùn)行模式的文章中有)的功能劃分就是:TaskScheduler負(fù)責(zé)實(shí)際每個(gè)具體任務(wù)的物理調(diào)度籍铁,DAGScheduler負(fù)責(zé)將作業(yè)拆分成不同階段的具有依賴關(guān)系的多批任務(wù)涡上,可以理解為DAGScheduler負(fù)責(zé)任務(wù)的邏輯調(diào)度。
基本概念
Task任務(wù) :?jiǎn)蝹€(gè)分區(qū)數(shù)據(jù)集上的最小處理流程單元
TaskSet任務(wù)集:一組關(guān)聯(lián)的拒名,但是互相之間沒(méi)有Shuffle依賴關(guān)系的任務(wù)所組成的任務(wù)集
Stage調(diào)度階段:一個(gè)任務(wù)集所對(duì)應(yīng)的調(diào)度階段
Job作業(yè):一次RDD Action生成的一個(gè)或多個(gè)Stage所組成的一次計(jì)算作業(yè)
運(yùn)行方式
DAGScheduler在SparkContext初始化過(guò)程中實(shí)例化吩愧,一個(gè)SparkContext對(duì)應(yīng)一個(gè)DAGScheduler,DAGScheduler的事件循環(huán)邏輯基于Akka Actor的消息傳遞機(jī)制來(lái)構(gòu)建增显,在DAGScheduler的Start函數(shù)中創(chuàng)建了一個(gè)eventProcessActor用來(lái)處理各種DAGSchedulerEvent雁佳,這些事件包括作業(yè)的提交,任務(wù)狀態(tài)的變化同云,監(jiān)控等等
private[scheduler]case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
private[scheduler]case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
private[scheduler]case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent
private[scheduler]case object AllJobsCancelled extends DAGSchedulerEvent
private[scheduler]
case classBeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
private[scheduler]
case classGettingResultEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
private[scheduler]case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics)
extends DAGSchedulerEvent
private[scheduler]case class ExecutorAdded(execId: String, host: String) extendsDAGSchedulerEvent
private[scheduler]case class ExecutorLost(execId: String) extends DAGSchedulerEvent
private[scheduler] caseclass TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
private[scheduler]case object ResubmitFailedStages extends DAGSchedulerEvent
private[scheduler]case object StopDAGScheduler extends DAGSchedulerEvent
不論是Client還是TaskScheduler與DAGScheduler的交互方式基本上都是通過(guò)DAGScheduler暴露的函數(shù)接口間接的給eventProcessActor發(fā)送相關(guān)消息
如前面所說(shuō)糖权,DAGScheduler最重要的任務(wù)之一就是計(jì)算作業(yè)和任務(wù)的依賴關(guān)系,制定調(diào)度邏輯
DAGScheduler作業(yè)調(diào)度的兩個(gè)主要入口是submitJob 和 runJob炸站,兩者的區(qū)別在于前者返回一個(gè)Jobwaiter對(duì)象星澳,可以用在異步調(diào)用中,用來(lái)判斷作業(yè)完成或者取消作業(yè)旱易,runJob在內(nèi)部調(diào)用submitJob禁偎,阻塞等待直到作業(yè)完成(或失敗)
具體往DAGScheduler提交作業(yè)的操作阀坏,基本都是封裝在RDD的相關(guān)Action操作里面如暖,不需要用戶顯式的提交作業(yè)
用戶代碼都是基于RDD的一系列計(jì)算操作,實(shí)際運(yùn)行時(shí)全释,這些計(jì)算操作是Lazy執(zhí)行的装处,并不是所有的RDD操作都會(huì)觸發(fā)Spark往Cluster上提交實(shí)際作業(yè),基本上只有一些需要返回?cái)?shù)據(jù)或者向外部輸出的操作才會(huì)觸發(fā)實(shí)際計(jì)算工作浸船,其它的變換操作基本上只是生成對(duì)應(yīng)的RDD記錄依賴關(guān)系妄迁。
DAGScheduler內(nèi)部維護(hù)了各種 task / stage / job之間的映射關(guān)系表
工作流程
提交并運(yùn)行一個(gè)Job的基本流程,包括以下步驟
劃分Stage
當(dāng)某個(gè)操作觸發(fā)計(jì)算李命,向DAGScheduler提交作業(yè)時(shí)登淘,DAGScheduler需要從RDD依賴鏈最末端的RDD出發(fā),遍歷整個(gè)RDD依賴鏈封字,劃分Stage任務(wù)階段黔州,并決定各個(gè)Stage之間的依賴關(guān)系。Stage的劃分是以ShuffleDependency為依據(jù)的阔籽,也就是說(shuō)當(dāng)某個(gè)RDD的運(yùn)算需要將數(shù)據(jù)進(jìn)行Shuffle時(shí)流妻,這個(gè)包含了Shuffle依賴關(guān)系的RDD將被用來(lái)作為輸入信息,構(gòu)建一個(gè)新的Stage笆制,由此為依據(jù)劃分Stage绅这,可以確保有依賴關(guān)系的數(shù)據(jù)能夠按照正確的順序得到處理和運(yùn)算。
以GroupByKey操作為例在辆,該操作返回的結(jié)果實(shí)際上是一個(gè)ShuffleRDD证薇,當(dāng)DAGScheduler遍歷到這個(gè)ShuffleRDD的時(shí)候,因?yàn)槠銬ependency是一個(gè)ShuffleDependency匆篓,于是這個(gè)ShuffleRDD的父RDD以及shuffleDependency等對(duì)象就被用來(lái)構(gòu)建一個(gè)新的Stage浑度,這個(gè)Stage的輸出結(jié)果的分區(qū)方式,則由ShuffleDependency中的Partitioner對(duì)象來(lái)決定鸦概。
可以看到箩张,盡管劃分和構(gòu)建Stage的依據(jù)是ShuffleDependency,對(duì)應(yīng)的RDD也就是這里的ShuffleRDD窗市,但是這個(gè)Stage所處理的數(shù)據(jù)是從這個(gè)shuffleRDD的父RDD開(kāi)始計(jì)算的先慷,只是最終的輸出結(jié)果的位置信息參考了ShuffleRDD返回的ShuffleDependency里所包含的內(nèi)容。而shuffleRDD本身的運(yùn)算操作(其實(shí)就是一個(gè)獲取shuffle結(jié)果的過(guò)程)谨设,是在下一個(gè)Stage里進(jìn)行的熟掂。
生成Job,提交Stage
上一個(gè)步驟得到一個(gè)或多個(gè)有依賴關(guān)系的Stage扎拣,其中直接觸發(fā)Job的RDD所關(guān)聯(lián)的Stage作為FinalStage生成一個(gè)Job實(shí)例赴肚,這兩者的關(guān)系進(jìn)一步存儲(chǔ)在resultStageToJob映射表中,用于在該Stage全部完成時(shí)做一些后續(xù)處理二蓝,如報(bào)告狀態(tài)誉券,清理Job相關(guān)數(shù)據(jù)等。
具體提交一個(gè)Stage時(shí)刊愚,首先判斷該Stage所依賴的父Stage的結(jié)果是否可用踊跟,如果所有父Stage的結(jié)果都可用,則提交該Stage,如果有任何一個(gè)父Stage的結(jié)果不可用商玫,則迭代嘗試提交父Stage箕憾。 所有迭代過(guò)程中由于所依賴Stage的結(jié)果不可用而沒(méi)有提交成功的Stage都被放到waitingStages列表中等待將來(lái)被提交
什么時(shí)候waitingStages中的Stage會(huì)被重新提交呢,當(dāng)一個(gè)屬于中間過(guò)程Stage的任務(wù)(這種類型的任務(wù)所對(duì)應(yīng)的類為ShuffleMapTask)完成以后拳昌,DAGScheduler會(huì)檢查對(duì)應(yīng)的Stage的所有任務(wù)是否都完成了袭异,如果是都完成了,則DAGScheduler將重新掃描一次waitingStages中的所有Stage炬藤,檢查他們是否還有任何依賴的Stage沒(méi)有完成御铃,如果沒(méi)有就可以提交該Stage。
此外每當(dāng)完成一次DAGScheduler的事件循環(huán)以后沈矿,也會(huì)觸發(fā)一次從等待和失敗列表中掃描并提交就緒Stage的調(diào)用過(guò)程
此外每當(dāng)完成一次DAGScheduler的事件循環(huán)以后上真,也會(huì)觸發(fā)一次從等待和失敗列表中掃描并提交就緒Stage的調(diào)用過(guò)程
任務(wù)集的提交
每個(gè)Stage的提交,最終是轉(zhuǎn)換成一個(gè)TaskSet任務(wù)集的提交羹膳,DAGScheduler通過(guò)TaskScheduler接口提交TaskSet睡互,這個(gè)TaskSet最終會(huì)觸發(fā)TaskScheduler構(gòu)建一個(gè)TaskSetManager的實(shí)例來(lái)管理這個(gè)TaskSet的生命周期,對(duì)于DAGScheduler來(lái)說(shuō)提交Stage的工作到此就完成了溜徙。而TaskScheduler的具體實(shí)現(xiàn)則會(huì)在得到計(jì)算資源的時(shí)候湃缎,進(jìn)一步通過(guò)TaskSetManager調(diào)度具體的Task到對(duì)應(yīng)的Executor節(jié)點(diǎn)上進(jìn)行運(yùn)算
任務(wù)作業(yè)完成狀態(tài)的監(jiān)控
要保證相互依賴的job/stage能夠得到順利的調(diào)度執(zhí)行,DAGScheduler就必然需要監(jiān)控當(dāng)前Job / Stage乃至Task的完成情況蠢壹。這是通過(guò)對(duì)外(主要是對(duì)TaskScheduler)暴露一系列的回調(diào)函數(shù)來(lái)實(shí)現(xiàn)的嗓违,對(duì)于TaskScheduler來(lái)說(shuō),這些回調(diào)函數(shù)主要包括任務(wù)的開(kāi)始結(jié)束失敗图贸,任務(wù)集的失敗蹂季,DAGScheduler根據(jù)這些Task的生命周期信息進(jìn)一步維護(hù)Job和Stage的狀態(tài)信息。
此外TaskScheduler還可以通過(guò)回調(diào)函數(shù)通知DAGScheduler具體的Executor的生命狀態(tài)疏日,如果某一個(gè)Executor崩潰了偿洁,或者由于任何原因與Driver失去聯(lián)系了,則對(duì)應(yīng)的Stage的shuffleMapTask的輸出結(jié)果也將被標(biāo)志為不可用沟优,這也將導(dǎo)致對(duì)應(yīng)Stage狀態(tài)的變更涕滋,進(jìn)而影響相關(guān)Job的狀態(tài)梅鹦,再進(jìn)一步可能觸發(fā)對(duì)應(yīng)Stage的重新提交來(lái)重新計(jì)算獲取相關(guān)的數(shù)據(jù)罪裹。
任務(wù)結(jié)果的獲取
一個(gè)具體的任務(wù)在Executor中執(zhí)行完畢以后,其結(jié)果需要以某種形式返回給DAGScheduler贷腕,根據(jù)任務(wù)類型的不同侵俗,任務(wù)的結(jié)果的返回方式也不同
對(duì)于FinalStage所對(duì)應(yīng)的任務(wù)(對(duì)應(yīng)的類為ResultTask)返回給DAGScheduler的是運(yùn)算結(jié)果本身锨用,而對(duì)于ShuffleMapTask,返回給DAGScheduler的是一個(gè)MapStatus對(duì)象隘谣,MapStatus對(duì)象管理了ShuffleMapTask的運(yùn)算輸出結(jié)果在BlockManager里的相關(guān)存儲(chǔ)信息增拥,而非結(jié)果本身,這些存儲(chǔ)位置信息將作為下一個(gè)Stage的任務(wù)的獲取輸入數(shù)據(jù)的依據(jù)
而根據(jù)任務(wù)結(jié)果的大小的不同,ResultTask返回的結(jié)果又分為兩類掌栅,如果結(jié)果足夠小秩仆,則直接放在DirectTaskResult對(duì)象內(nèi),如果超過(guò)特定尺寸(默認(rèn)約10MB)則在Executor端會(huì)將DirectTaskResult先序列化渣玲,再把序列化的結(jié)果作為一個(gè)Block存放在BlockManager里逗概,而后將BlockManager返回的BlockID放在IndirectTaskResult對(duì)象中返回給TaskScheduler弟晚,TaskScheduler進(jìn)而調(diào)用TaskResultGetter將IndirectTaskResult中的BlockID取出并通過(guò)BlockManager最終取得對(duì)應(yīng)的DirectTaskResult忘衍。當(dāng)然從DAGScheduler的角度來(lái)說(shuō),這些過(guò)程對(duì)它來(lái)說(shuō)是透明的卿城,它所獲得的都是任務(wù)的實(shí)際運(yùn)算結(jié)果枚钓。
TaskSetManager
前面提到DAGScheduler負(fù)責(zé)將一組任務(wù)提交給TaskScheduler以后,這組任務(wù)的調(diào)度工作對(duì)它來(lái)說(shuō)就算完成了瑟押,接下來(lái)這組任務(wù)內(nèi)部的調(diào)度邏輯搀捷,則是由TaskSetManager來(lái)完成的。
TaskSetManager的主要接口包括:
ResourceOffer:根據(jù)TaskScheduler所提供的單個(gè)Resource資源包括host多望,executor和locality的要求返回一個(gè)合適的Task嫩舟。TaskSetManager內(nèi)部會(huì)根據(jù)上一個(gè)任務(wù)成功提交的時(shí)間,自動(dòng)調(diào)整自身的Locality匹配策略怀偷,如果上一次成功提交任務(wù)的時(shí)間間隔很長(zhǎng)家厌,則降低對(duì)Locality的要求(例如從最差要求Process Local降低為最差要求Node Local),反之則提高對(duì)Locality的要求椎工。這一動(dòng)態(tài)調(diào)整Locality策略基本可以理解為是為了提高任務(wù)在最佳Locality的情況下得到運(yùn)行的機(jī)會(huì)饭于,因?yàn)镽esource資源可能是在短期內(nèi)分批提供給TaskSetManager的,動(dòng)態(tài)調(diào)整Locality門(mén)檻有助于改善整體的Locality分布情況维蒙。
舉個(gè)例子掰吕,如果TaskSetManager內(nèi)部有a/b兩個(gè)任務(wù)等待調(diào)度,a/b兩個(gè)任務(wù)Prefer的節(jié)點(diǎn)分別是Host A 和 Host B颅痊, 這時(shí)候先有一個(gè)Host C的資源以最差匹配為Rack Local的形式提供給TaskSetManager殖熟,如果沒(méi)有內(nèi)部動(dòng)態(tài)Locality調(diào)整機(jī)制,那么比如a任務(wù)將被調(diào)度斑响。接下來(lái)在很短的時(shí)間間隔內(nèi)菱属,一個(gè)Host A的資源來(lái)到,同樣的b任務(wù)被調(diào)度恋捆。 而原本最佳的情況應(yīng)該是任務(wù)b調(diào)度給Host C照皆, 而任務(wù)a調(diào)度給Host A。
當(dāng)然動(dòng)態(tài)Locality也會(huì)帶來(lái)一定的調(diào)度延遲沸停,因此如何設(shè)置合適的調(diào)整策略也是需要針對(duì)實(shí)際情況來(lái)確定的膜毁。目前可以設(shè)置參數(shù)包括
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
即各個(gè)Locality級(jí)別中TaskSetManager等待分配下一個(gè)任務(wù)的時(shí)間,如果距離上一次成功分配資源的時(shí)間間隔超過(guò)對(duì)應(yīng)的參數(shù)值,則降低匹配要求(即process -> node -> rack -> any)瘟滨, 而每當(dāng)成功分配一個(gè)任務(wù)時(shí)候醒,則重置時(shí)間間隔,并更新Locality級(jí)別為當(dāng)前成功分配的任務(wù)的Locality級(jí)別
handleSuccessfulTask / handleFailedTask /handleTaskGettingResult :用于更新任務(wù)的運(yùn)行狀態(tài)杂瘸,Taskset Manager在這些函數(shù)中除了更新自身維護(hù)的任務(wù)狀態(tài)列表等信息倒淫,用于剩余的任務(wù)的調(diào)度以外,也會(huì)進(jìn)一步調(diào)用DAGScheduler的函數(shù)接口將結(jié)果通知給它败玉。
此外敌土,TaskSetManager在調(diào)度任務(wù)時(shí)還可能進(jìn)一步考慮Speculation的情況,亦即當(dāng)某個(gè)任務(wù)的運(yùn)行時(shí)間超過(guò)其它任務(wù)的運(yùn)行完成時(shí)間的一個(gè)特定比例值時(shí)运翼,該任務(wù)可能被重復(fù)調(diào)度返干。目的當(dāng)然是為了防止某個(gè)運(yùn)行中的Task由于某些特殊原因(例如所在節(jié)點(diǎn)CPU負(fù)載過(guò)高,IO帶寬被占等等)運(yùn)行特別緩慢拖延了整個(gè)Stage的完成時(shí)間血淌,Speculation同樣需要根據(jù)集群和作業(yè)的實(shí)際情況合理配置矩欠,否則可能反而降低集群性能。
Pool 調(diào)度池
前面我們說(shuō)了悠夯,DAGScheduler負(fù)責(zé)構(gòu)建具有依賴關(guān)系的任務(wù)集癌淮,TaskSetManager負(fù)責(zé)在具體的任務(wù)集的內(nèi)部調(diào)度任務(wù),而TaskScheduler負(fù)責(zé)將資源提供給TaskSetManager供其作為調(diào)度任務(wù)的依據(jù)沦补。但是每個(gè)SparkContext可能同時(shí)存在多個(gè)可運(yùn)行的任務(wù)集(沒(méi)有依賴關(guān)系)乳蓄,這些任務(wù)集之間如何調(diào)度,則是由調(diào)度池(POOL)對(duì)象來(lái)決定的策彤,Pool所管理的對(duì)象是下一級(jí)的Pool或者TaskSetManager對(duì)象
TaskSchedulerImpl在初始化過(guò)程中會(huì)根據(jù)用戶設(shè)定的SchedulingMode(默認(rèn)為FIFO)創(chuàng)建一個(gè)rootPool根調(diào)度池栓袖,之后根據(jù)具體的調(diào)度模式再進(jìn)一步創(chuàng)建SchedulableBuilder對(duì)象,具體的SchedulableBuilder對(duì)象的BuildPools方法將在rootPool的基礎(chǔ)上完成整個(gè)Pool的構(gòu)建工作店诗。
目前的實(shí)現(xiàn)有兩種調(diào)度模式裹刮,對(duì)應(yīng)了兩種類型的Pool:
FIFO:先進(jìn)先出型,F(xiàn)IFO Pool直接管理的是TaskSetManager庞瘸,每個(gè)TaskSetManager創(chuàng)建時(shí)都存儲(chǔ)了其對(duì)應(yīng)的StageID捧弃,F(xiàn)IFO pool最終根據(jù)StageID的順序來(lái)調(diào)度TaskSetManager
FAIR:公平調(diào)度,F(xiàn)AIR Pool管理的對(duì)象是下一級(jí)的POOL擦囊,或者TaskSetManager违霞,公平調(diào)度的基本原則是根據(jù)所管理的Pool/TaskSetManager中正在運(yùn)行的任務(wù)的數(shù)量來(lái)判斷優(yōu)先級(jí),用戶可以設(shè)置minShare最小任務(wù)數(shù)瞬场,weight任務(wù)權(quán)重來(lái)調(diào)整對(duì)應(yīng)Pool里的任務(wù)集的優(yōu)先程度买鸽。當(dāng)采用公平調(diào)度模式時(shí),目前所構(gòu)建的調(diào)度池是兩級(jí)的結(jié)構(gòu)贯被,即根調(diào)度池管理一組子調(diào)度池眼五,子調(diào)度池進(jìn)一步管理屬于該調(diào)度池的TaskSetManager
公平調(diào)度模式的配置通過(guò)配置文件來(lái)管理妆艘,默認(rèn)使用fairscheduler.xml文件,范例參見(jiàn)conf目錄下的模板:
<?xmlversionxmlversion="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
由于這里的調(diào)度池是在SparkContext內(nèi)部的調(diào)度看幼,因此其調(diào)度范疇是一個(gè)基于該SparkContext的Spark應(yīng)用程序批旺,正常情況下,多個(gè)Spark應(yīng)用程序之間在調(diào)度池層面是沒(méi)有調(diào)度優(yōu)先級(jí)關(guān)系的诵姜。那么這種調(diào)度模式的應(yīng)用場(chǎng)合是怎樣的呢汽煮? 舉一個(gè)例子就是SparkServer或者SharkServer,作為一個(gè)長(zhǎng)期運(yùn)行的SparkContext棚唆,他們代理運(yùn)行了其它連上Server的Spark應(yīng)用的任務(wù)暇赤,這樣你可以為每個(gè)鏈接按照用戶名指定一個(gè)Pool運(yùn)行,從而實(shí)現(xiàn)用戶優(yōu)先級(jí)和資源分配的合理調(diào)度等瑟俭。
Spark應(yīng)用之間的調(diào)度
在Mesos和YARN模式下翎卓,底層資源調(diào)度系統(tǒng)的調(diào)度策略由Mesos和YARN所決定,只有在Standalone模式下摆寄,Spark Master按照當(dāng)前cluster資源是否滿足等待列表中的Spark應(yīng)用 對(duì)內(nèi)存和CPU資源的需求,而決定是否創(chuàng)建一個(gè)SparkContext對(duì)應(yīng)的Driver坯门,進(jìn)而完成Spark應(yīng)用的啟動(dòng)過(guò)程微饥,這可以粗略近似的認(rèn)為是一種粗顆粒度的有條件的FIFO策略吧