Spark 任務(wù)調(diào)度系統(tǒng)

如何工作

先將用戶構(gòu)建的DAG轉(zhuǎn)化為分布式任務(wù),結(jié)合分布式集群資源的可用性画舌,基于調(diào)度規(guī)則依序把分布式任務(wù)分發(fā)到執(zhí)行器。

執(zhí)行步驟

image

調(diào)度的核心組件

image

DAGScheduler

主要職責(zé):

1.把用戶DAG拆分為Stages

2.在Stage內(nèi)部創(chuàng)建Tasks搀矫,這些任務(wù)囊括了用戶通過組合不同算子實(shí)現(xiàn)的數(shù)據(jù)轉(zhuǎn)換邏輯官觅,然后執(zhí)行器Executors接受到Tasks,將其中封裝的計(jì)算函數(shù)應(yīng)用于分布式數(shù)據(jù)分片洋闽,去執(zhí)行分布式的計(jì)算過程玄柠。

在分發(fā)任務(wù)之前,調(diào)度系統(tǒng)得先判斷哪些節(jié)點(diǎn)得計(jì)算資源空閑诫舅,然后再把任務(wù)分發(fā)過去羽利。

TaskScheduler

職責(zé):基于既定規(guī)則于策略達(dá)成供需雙方的匹配于撮合,核心是任務(wù)調(diào)度的規(guī)則與策略

image

TaskScheduler的調(diào)度分為兩個(gè)層次刊懈,一個(gè)是Stages之間的調(diào)度優(yōu)先級(jí)这弧,一個(gè)是Stages內(nèi)不同任務(wù)之間的調(diào)度優(yōu)先級(jí)。

不同Stages之間

首先虚汛,對于兩個(gè)或者多個(gè)Stages匾浪,如果它們之間不存在依賴關(guān)系,互相獨(dú)立卷哩,在面對同一份可用資源的時(shí)候蛋辈,就會(huì)存在競爭關(guān)系,這個(gè)時(shí)候殉疼,先調(diào)度誰梯浪,或者說誰有限享用這份計(jì)算資源。就得基于既定規(guī)則與協(xié)議瓢娜。

TaskScheduler提供 兩種調(diào)度模式挂洛,分別是FIFO和FAIR(公平調(diào)度),F(xiàn)AIR取決于配置文件failscheduler.xml定義眠砾。

同一個(gè)Stage之間

當(dāng)TaskScheduler接受到來自于SchedulerBackend的WorkerOffer后虏劲,TaskScheduler會(huì)優(yōu)先選擇那些滿足本地性級(jí)別的要求的任務(wù)進(jìn)行分發(fā)。本地性級(jí)別有4種,process local < Node local <Rack local < Any,分別是進(jìn)程本地性柒巫,節(jié)點(diǎn)本地性励堡,機(jī)架本地性和跨機(jī)架本地性。從左到右堡掏,計(jì)算任務(wù)訪問所需數(shù)據(jù)的效率越來越差应结。

進(jìn)程本地性表示計(jì)算任務(wù)所需的輸入數(shù)據(jù)就在某一個(gè)Executor進(jìn)程內(nèi),因此把這樣的計(jì)算任務(wù)調(diào)度到目標(biāo)進(jìn)程內(nèi)最劃算泉唁。同理鹅龄,如果數(shù)據(jù)源還未加載到Executor進(jìn)程,而是存儲(chǔ)在某一計(jì)算節(jié)點(diǎn)的磁盤中亭畜,那么把任務(wù)調(diào)度到目標(biāo)節(jié)點(diǎn)上去扮休,也是一個(gè)不錯(cuò)的選擇。再次拴鸵,如果我們無法確定輸入源在哪臺(tái)機(jī)器玷坠,但可以肯定它一定在某個(gè)機(jī)架上,本地性級(jí)別就會(huì)退化到Racklocal劲藐。

DAGScheduler劃分Stages八堡、創(chuàng)建分布式任務(wù)的過程中,會(huì)為每一個(gè)任務(wù)指定本地性級(jí)別,本地性級(jí)別中會(huì)記錄該任務(wù)有意向的計(jì)算節(jié)點(diǎn)地址瘩燥,甚至是Executor進(jìn)程 ID秕重。換句話說,任務(wù)自帶調(diào)度意愿厉膀,它通過本地性級(jí)別告訴TaskScheduler自己更樂意被調(diào)度到哪里去溶耘。

由此可見,Spark調(diào)度系統(tǒng)的原則是盡可能地讓數(shù)據(jù)呆在原地服鹅、保持不動(dòng)凳兵,同時(shí)盡可能地把承載計(jì)算任務(wù)的代碼分發(fā)到離數(shù)據(jù)最近的地方,從而最大限度地降低分布式系統(tǒng)中的網(wǎng)絡(luò)開銷企软。畢竟庐扫,分發(fā)代碼的開銷要比分發(fā)數(shù)據(jù)的代價(jià)低太多,這也正是“數(shù)據(jù)不動(dòng)代碼動(dòng)”這個(gè)說法的由來仗哨。

SchedulerBackend

對于資源調(diào)度器得封裝和抽象形庭,支持Standalone,yarn等厌漂,SchedulerBackend使用ExecutorDataMap得數(shù)據(jù)結(jié)構(gòu)記錄Executor得資源狀態(tài)萨醒。是一種HashMap,key標(biāo)記Executor得字符串苇倡,value是一種ExecutorData得數(shù)據(jù)機(jī)構(gòu)富纸,ExecutorData封裝了Executor得資源狀態(tài)囤踩,例如RPC地址,主機(jī)地址晓褪,可用CPU核數(shù)堵漱,滿配CPU核數(shù)等等。是Executor得“資源畫像”涣仿。

// any protection. But accessing `executorDataMap` out of the inherited methods must be
// protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should only
// be modified in the inherited methods from ThreadSafeRpcEndpoint with protection by
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]
private[cluster] class ExecutorData(
    val executorEndpoint: RpcEndpointRef,
    val executorAddress: RpcAddress,
    override val executorHost: String,
    var freeCores: Int,
    override val totalCores: Int,
    override val logUrlMap: Map[String, String],
    override val attributes: Map[String, String],
    override val resourcesInfo: Map[String, ExecutorResourceInfo],
    override val resourceProfileId: Int,
    val registrationTs: Long
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
  resourcesInfo, resourceProfileId)

SchedulerBackend對內(nèi)是Executor的資源畫像勤庐,對外是SchedulerBackend以WorkerOffer為粒度提供計(jì)算資源。WorkerOffer封裝了Executor ID变过,主機(jī)地址埃元、CPU核數(shù)涝涤,用來表示一份可用于調(diào)度任務(wù)的空閑資源媚狰。

private[spark]
case class WorkerOffer(
    executorId: String,
    host: String,
    cores: Int,
    // `address` is an optional hostPort string, it provide more useful information than `host`
    // when multiple executors are launched on the same host.
    address: Option[String] = None,
    resources: Map[String, Buffer[String]] = Map.empty,
    resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)

總的來說,TaskScheduler根據(jù)本地性級(jí)別遴選出待計(jì)算任務(wù)之后阔拳,先對這些任務(wù)進(jìn)行序列化崭孤。然后,交給SchedulerBackend,SchedulerBackend根據(jù)ExecutorData 中記錄的RPC地址和主機(jī)地址糊肠,再將序列化的任務(wù)通過網(wǎng)絡(luò)分發(fā)到目的主機(jī)的Executor中去辨宠。最后,Executor接收到任務(wù)之后货裹,把任務(wù)交由內(nèi)置的線程池嗤形,線程池中的多線程則并發(fā)地在不同數(shù)據(jù)分片之上執(zhí)行任務(wù)中封裝的數(shù)據(jù)處理函數(shù),從而實(shí)現(xiàn)分布式計(jì)算弧圆。

DAGScheduler在創(chuàng)建Tasks 的過程中赋兵,是如何設(shè)置每一個(gè)任務(wù)的本地性級(jí)別?

位置信息通過特定的字符串前綴格式標(biāo)識(shí) executor_[hostname][executorid] [hostname] hdfs_cache[hostname] DAGScheduler會(huì)嘗試獲取RDD的每個(gè)Partition的偏好位置信息,a.如果RDD被緩存搔预,通過緩存的位置信息獲取每個(gè)分區(qū)的位置信息霹期;b.如果RDD有preferredLocations屬性,通過preferredLocations獲取每個(gè)分區(qū)的位置信息拯田;c. 遍歷RDD的所有是NarrowDependency的父RDD历造,找到第一個(gè)滿足a,b條件的位置信息 DAGScheduler將生成好的TaskSet提交給TaskSetManager進(jìn)行任務(wù)的本地性級(jí)別計(jì)算

參考資料:吳磊老師的Spark調(diào)優(yōu)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市船庇,隨后出現(xiàn)的幾起案子吭产,更是在濱河造成了極大的恐慌,老刑警劉巖鸭轮,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件臣淤,死亡現(xiàn)場離奇詭異,居然都是意外死亡张弛,警方通過查閱死者的電腦和手機(jī)荒典,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門酪劫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人寺董,你說我怎么就攤上這事覆糟。” “怎么了遮咖?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵滩字,是天一觀的道長。 經(jīng)常有香客問我御吞,道長麦箍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任陶珠,我火速辦了婚禮挟裂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘揍诽。我一直安慰自己诀蓉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布暑脆。 她就那樣靜靜地躺著渠啤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪添吗。 梳的紋絲不亂的頭發(fā)上沥曹,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機(jī)與錄音碟联,去河邊找鬼妓美。 笑死,一個(gè)胖子當(dāng)著我的面吹牛玄帕,可吹牛的內(nèi)容都是我干的部脚。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼裤纹,長吁一口氣:“原來是場噩夢啊……” “哼委刘!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鹰椒,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤锡移,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后漆际,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體淆珊,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年奸汇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了施符。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片往声。...
    茶點(diǎn)故事閱讀 38,094評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖戳吝,靈堂內(nèi)的尸體忽然破棺而出浩销,到底是詐尸還是另有隱情,我是刑警寧澤听哭,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布慢洋,位于F島的核電站,受9級(jí)特大地震影響陆盘,放射性物質(zhì)發(fā)生泄漏普筹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一隘马、第九天 我趴在偏房一處隱蔽的房頂上張望太防。 院中可真熱鬧,春花似錦祟霍、人聲如沸杏头。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至呢燥,卻和暖如春崭添,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背叛氨。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工呼渣, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人寞埠。 一個(gè)月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓屁置,卻偏偏與公主長得像,于是被迫代替她去往敵國和親仁连。 傳聞我的和親對象是個(gè)殘疾皇子蓝角,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容