如何工作
先將用戶構(gòu)建的DAG轉(zhuǎn)化為分布式任務(wù),結(jié)合分布式集群資源的可用性画舌,基于調(diào)度規(guī)則依序把分布式任務(wù)分發(fā)到執(zhí)行器。
執(zhí)行步驟
調(diào)度的核心組件
DAGScheduler
主要職責(zé):
1.把用戶DAG拆分為Stages
2.在Stage內(nèi)部創(chuàng)建Tasks搀矫,這些任務(wù)囊括了用戶通過組合不同算子實(shí)現(xiàn)的數(shù)據(jù)轉(zhuǎn)換邏輯官觅,然后執(zhí)行器Executors接受到Tasks,將其中封裝的計(jì)算函數(shù)應(yīng)用于分布式數(shù)據(jù)分片洋闽,去執(zhí)行分布式的計(jì)算過程玄柠。
在分發(fā)任務(wù)之前,調(diào)度系統(tǒng)得先判斷哪些節(jié)點(diǎn)得計(jì)算資源空閑诫舅,然后再把任務(wù)分發(fā)過去羽利。
TaskScheduler
職責(zé):基于既定規(guī)則于策略達(dá)成供需雙方的匹配于撮合,核心是任務(wù)調(diào)度的規(guī)則與策略
TaskScheduler的調(diào)度分為兩個(gè)層次刊懈,一個(gè)是Stages之間的調(diào)度優(yōu)先級(jí)这弧,一個(gè)是Stages內(nèi)不同任務(wù)之間的調(diào)度優(yōu)先級(jí)。
不同Stages之間
首先虚汛,對于兩個(gè)或者多個(gè)Stages匾浪,如果它們之間不存在依賴關(guān)系,互相獨(dú)立卷哩,在面對同一份可用資源的時(shí)候蛋辈,就會(huì)存在競爭關(guān)系,這個(gè)時(shí)候殉疼,先調(diào)度誰梯浪,或者說誰有限享用這份計(jì)算資源。就得基于既定規(guī)則與協(xié)議瓢娜。
TaskScheduler提供 兩種調(diào)度模式挂洛,分別是FIFO和FAIR(公平調(diào)度),F(xiàn)AIR取決于配置文件failscheduler.xml定義眠砾。
同一個(gè)Stage之間
當(dāng)TaskScheduler接受到來自于SchedulerBackend的WorkerOffer后虏劲,TaskScheduler會(huì)優(yōu)先選擇那些滿足本地性級(jí)別的要求的任務(wù)進(jìn)行分發(fā)。本地性級(jí)別有4種,process local < Node local <Rack local < Any,分別是進(jìn)程本地性柒巫,節(jié)點(diǎn)本地性励堡,機(jī)架本地性和跨機(jī)架本地性。從左到右堡掏,計(jì)算任務(wù)訪問所需數(shù)據(jù)的效率越來越差应结。
進(jìn)程本地性表示計(jì)算任務(wù)所需的輸入數(shù)據(jù)就在某一個(gè)Executor進(jìn)程內(nèi),因此把這樣的計(jì)算任務(wù)調(diào)度到目標(biāo)進(jìn)程內(nèi)最劃算泉唁。同理鹅龄,如果數(shù)據(jù)源還未加載到Executor進(jìn)程,而是存儲(chǔ)在某一計(jì)算節(jié)點(diǎn)的磁盤中亭畜,那么把任務(wù)調(diào)度到目標(biāo)節(jié)點(diǎn)上去扮休,也是一個(gè)不錯(cuò)的選擇。再次拴鸵,如果我們無法確定輸入源在哪臺(tái)機(jī)器玷坠,但可以肯定它一定在某個(gè)機(jī)架上,本地性級(jí)別就會(huì)退化到Racklocal劲藐。
DAGScheduler劃分Stages八堡、創(chuàng)建分布式任務(wù)的過程中,會(huì)為每一個(gè)任務(wù)指定本地性級(jí)別,本地性級(jí)別中會(huì)記錄該任務(wù)有意向的計(jì)算節(jié)點(diǎn)地址瘩燥,甚至是Executor進(jìn)程 ID秕重。換句話說,任務(wù)自帶調(diào)度意愿厉膀,它通過本地性級(jí)別告訴TaskScheduler自己更樂意被調(diào)度到哪里去溶耘。
由此可見,Spark調(diào)度系統(tǒng)的原則是盡可能地讓數(shù)據(jù)呆在原地服鹅、保持不動(dòng)凳兵,同時(shí)盡可能地把承載計(jì)算任務(wù)的代碼分發(fā)到離數(shù)據(jù)最近的地方,從而最大限度地降低分布式系統(tǒng)中的網(wǎng)絡(luò)開銷企软。畢竟庐扫,分發(fā)代碼的開銷要比分發(fā)數(shù)據(jù)的代價(jià)低太多,這也正是“數(shù)據(jù)不動(dòng)代碼動(dòng)”這個(gè)說法的由來仗哨。
SchedulerBackend
對于資源調(diào)度器得封裝和抽象形庭,支持Standalone,yarn等厌漂,SchedulerBackend使用ExecutorDataMap得數(shù)據(jù)結(jié)構(gòu)記錄Executor得資源狀態(tài)萨醒。是一種HashMap,key標(biāo)記Executor得字符串苇倡,value是一種ExecutorData得數(shù)據(jù)機(jī)構(gòu)富纸,ExecutorData封裝了Executor得資源狀態(tài)囤踩,例如RPC地址,主機(jī)地址晓褪,可用CPU核數(shù)堵漱,滿配CPU核數(shù)等等。是Executor得“資源畫像”涣仿。
// any protection. But accessing `executorDataMap` out of the inherited methods must be
// protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only
// be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val attributes: Map[String, String],
override val resourcesInfo: Map[String, ExecutorResourceInfo],
override val resourceProfileId: Int,
val registrationTs: Long
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
resourcesInfo, resourceProfileId)
SchedulerBackend對內(nèi)是Executor的資源畫像勤庐,對外是SchedulerBackend以WorkerOffer為粒度提供計(jì)算資源。WorkerOffer封裝了Executor ID变过,主機(jī)地址埃元、CPU核數(shù)涝涤,用來表示一份可用于調(diào)度任務(wù)的空閑資源媚狰。
private[spark]
case class WorkerOffer(
executorId: String,
host: String,
cores: Int,
// `address` is an optional hostPort string, it provide more useful information than `host`
// when multiple executors are launched on the same host.
address: Option[String] = None,
resources: Map[String, Buffer[String]] = Map.empty,
resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
總的來說,TaskScheduler根據(jù)本地性級(jí)別遴選出待計(jì)算任務(wù)之后阔拳,先對這些任務(wù)進(jìn)行序列化崭孤。然后,交給SchedulerBackend,SchedulerBackend根據(jù)ExecutorData 中記錄的RPC地址和主機(jī)地址糊肠,再將序列化的任務(wù)通過網(wǎng)絡(luò)分發(fā)到目的主機(jī)的Executor中去辨宠。最后,Executor接收到任務(wù)之后货裹,把任務(wù)交由內(nèi)置的線程池嗤形,線程池中的多線程則并發(fā)地在不同數(shù)據(jù)分片之上執(zhí)行任務(wù)中封裝的數(shù)據(jù)處理函數(shù),從而實(shí)現(xiàn)分布式計(jì)算弧圆。
DAGScheduler在創(chuàng)建Tasks 的過程中赋兵,是如何設(shè)置每一個(gè)任務(wù)的本地性級(jí)別?
位置信息通過特定的字符串前綴格式標(biāo)識(shí) executor_[hostname][executorid] [hostname] hdfs_cache[hostname] DAGScheduler會(huì)嘗試獲取RDD的每個(gè)Partition的偏好位置信息,a.如果RDD被緩存搔预,通過緩存的位置信息獲取每個(gè)分區(qū)的位置信息霹期;b.如果RDD有preferredLocations屬性,通過preferredLocations獲取每個(gè)分區(qū)的位置信息拯田;c. 遍歷RDD的所有是NarrowDependency的父RDD历造,找到第一個(gè)滿足a,b條件的位置信息 DAGScheduler將生成好的TaskSet提交給TaskSetManager進(jìn)行任務(wù)的本地性級(jí)別計(jì)算
參考資料:吳磊老師的Spark調(diào)優(yōu)