Spark實(shí)戰(zhàn)(3)_SparkContext原理剖析與源碼分析

TaskScheduler的初始化機(jī)制

TaskScheduler,如何注冊(cè)Application忧侧,executor如何反向注冊(cè)石窑?

TaskScheduler的初始化機(jī)制
  1. createTaskScheduler(),內(nèi)部會(huì)創(chuàng)建三個(gè)東西蚓炬。
  2. 一是TaskSchedulerImpl松逊,它其實(shí)就是我們所說(shuō)的TaskScheduler。
  3. 二是SparkDeploySchedulerBackend肯夏,它在底層會(huì)負(fù)責(zé)接收TaskSchedulerImpl的控制经宏,實(shí)際上負(fù)責(zé)與Master的注冊(cè),Ececutor的反注冊(cè)熄捍,task發(fā)送到executor等操作烛恤。
  4. 調(diào)用TaskSchedulerImpl的init()方法,創(chuàng)建SchedulerPool余耽,當(dāng)DAGScheduler要讓TaskScheduler去調(diào)度一些任務(wù)的時(shí)候缚柏,就會(huì)把這些任務(wù)放到調(diào)度池里面,它有不同的優(yōu)先策略碟贾,比如FIFO币喧。
  5. 調(diào)用TaskSchedulerImpl的start()方法,方法內(nèi)部調(diào)用SparkDeploySchedulerBackend的start()方法袱耽。
  6. SparkDeploySchedulerBackend的start()方法杀餐,創(chuàng)建一個(gè)東西,AppClient朱巨。
  7. AppClient史翘,啟動(dòng)一個(gè)線程,創(chuàng)建一個(gè)ClientActor冀续。
  8. ClientActor線程琼讽,調(diào)用兩個(gè)方法,registerWithMaster()——>tryRegisterAllMasters()洪唐。
  9. registerWithMaster()——>tryRegisterAllMasters()钻蹬,向MasterActor發(fā)送RegisterApplication(case class,里面封裝了Application的信息)凭需。
  10. RegisterApplication發(fā)送數(shù)據(jù)到Spark集群的Master——>Worker——>Executor问欠。
  11. Executor反向注冊(cè)到SparkDeploySchedulerBackend上面去肝匆。

TaskSchedulerImpl底層實(shí)際主要基于SparkDeploySchedulerBackend來(lái)工作。

DAGScheduler

DAGSchedulerEventProcessActor顺献,DAGScheduler底層基于該組件進(jìn)行通信旗国。(線程)

SprkUI

SprkUI,顯示Application運(yùn)行的狀態(tài)注整,啟動(dòng)一個(gè)jetty服務(wù)器粗仓,來(lái)提供web服務(wù),從而顯示網(wǎng)頁(yè)设捐。

源碼分析

package org.apache.spark借浊,SparkContext.scala

// Create and start the scheduler
  private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)
    
// 這是我們常用的Sparkt提交模式中的standalone方式
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

package org.apache.spark.scheduler,TaskSchedulerImpl.scala

/**
  * 1萝招、底層通過(guò)操作一個(gè)SchedulerBackend蚂斤,針對(duì)不同種類的cluster(standlalone、yarn槐沼、mesos)曙蒸,調(diào)度task
  * 2、它也可以通過(guò)使用一個(gè)LacalBackend岗钩,并且將isLocal參數(shù)設(shè)置為true纽窟,來(lái)在本地模式下工作
  * 3、它負(fù)責(zé)處理一些通用的邏輯兼吓,比如說(shuō)決定多個(gè)job的調(diào)度順序臂港,啟動(dòng)推測(cè)任務(wù)執(zhí)行
  * 4、客戶端首先應(yīng)該調(diào)用它的initialize()方法和start()方法视搏,然后通過(guò)runTasks()方法提交task sets
  */
  
def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }

start()方法审孽,

// start()方法,sparkContext.scala
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
  // constructor
  taskScheduler.start()
  
// TaskSchedulerImpl.scala
override def start() {
    backend.start()

// SparkDeploySchedulerBackend.scala    
override def start() {
    super.start()
    
// 這個(gè)ApplicationDescription浑娜,非常重要
    // 它就代表了當(dāng)前執(zhí)行的這個(gè)application的一些情況
    // 包括application最大需要多少cpu core佑力,每個(gè)slave上需要多少內(nèi)存
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec)

    // 創(chuàng)建了AppClient
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    client.start()

package org.apache.spark.deploy.client,AppClient.scala筋遭,

/**
  * 這是一個(gè)接口
  * 它負(fù)責(zé)為application與Spark集群進(jìn)行通信
  * 它會(huì)接收一個(gè)spark master的url打颤,以及一個(gè)ApplicationDescripition,和一個(gè)集群事件的監(jiān)聽(tīng)器漓滔,以及各種事件發(fā)生時(shí)编饺,
  * 監(jiān)聽(tīng)器的回調(diào)函數(shù)
  */
  
def start() {
    // Just launch an actor; it will call back into the listener.
    actor = actorSystem.actorOf(Props(new ClientActor))
  }

package org.apache.spark.scheduler,DAGScheduler

@volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler", e)
      }
    }
  }
  
/**
  * 實(shí)現(xiàn)了面向stage的調(diào)度機(jī)制的高層次的調(diào)度層次和。它會(huì)為每個(gè)job計(jì)算一個(gè)stage的DAG(有向無(wú)環(huán)圖)反肋,
  * 追蹤RDD和stage的輸出是否被物化了(物化就是寫入了磁盤或內(nèi)存等地方)那伐,并且尋找一個(gè)最少消耗(最優(yōu)踏施、最惺帷)調(diào)度機(jī)制來(lái)運(yùn)行job。
  * 它會(huì)將stage作為tasksets提交到底層的TaskSchedulerImpl上畅形,來(lái)在集群上運(yùn)行它們(task)养距。
  *
  * 除了stage的DAG,它還負(fù)責(zé)決定運(yùn)行每個(gè)task的最佳位置日熬,基于當(dāng)前的緩存狀態(tài)棍厌,將這些最佳位置提交給底層的TaskSchedulerImpl。
  * 此外竖席,它還會(huì)處理由于shuffle輸出文件丟失導(dǎo)致的失敗耘纱,在這種情況下,舊的stage可能會(huì)被重新提交毕荐。
  * 一個(gè)stage內(nèi)部的失敗束析,如果不是由于shuffle文件丟失所導(dǎo)致的,會(huì)被TaskScheduler處憎亚,它會(huì)多次重試每一個(gè)task员寇,直到最后,實(shí)在不行了第美,
  * 才會(huì)去取消整個(gè)stage蝶锋。
  */

the Spark UI,

// Initialize the Spark UI
  private[spark] val ui: Option[SparkUI] =
    if (conf.getBoolean("spark.ui.enabled", true)) {
      Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
        env.securityManager,appName))
    } else {
      // For tests, do not enable the UI
      None
    }

本文首發(fā)于steem什往,感謝閱讀扳缕,轉(zhuǎn)載請(qǐng)注明。

https://steemit.com/@padluo


微信公眾號(hào)「padluo」别威,分享數(shù)據(jù)科學(xué)家的自我修養(yǎng)第献,既然遇見(jiàn),不如一起成長(zhǎng)兔港。

數(shù)據(jù)分析

讀者交流電報(bào)群

https://t.me/sspadluo


知識(shí)星球交流群

知識(shí)星球讀者交流群
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末庸毫,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子衫樊,更是在濱河造成了極大的恐慌飒赃,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,509評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件科侈,死亡現(xiàn)場(chǎng)離奇詭異载佳,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)臀栈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門蔫慧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人权薯,你說(shuō)我怎么就攤上這事姑躲∷铮” “怎么了?”我有些...
    開封第一講書人閱讀 163,875評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵黍析,是天一觀的道長(zhǎng)卖怜。 經(jīng)常有香客問(wèn)我,道長(zhǎng)阐枣,這世上最難降的妖魔是什么马靠? 我笑而不...
    開封第一講書人閱讀 58,441評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮蔼两,結(jié)果婚禮上甩鳄,老公的妹妹穿的比我還像新娘。我一直安慰自己额划,他們只是感情好娩贷,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著锁孟,像睡著了一般彬祖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上品抽,一...
    開封第一講書人閱讀 51,365評(píng)論 1 302
  • 那天储笑,我揣著相機(jī)與錄音,去河邊找鬼圆恤。 笑死突倍,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的盆昙。 我是一名探鬼主播羽历,決...
    沈念sama閱讀 40,190評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼淡喜!你這毒婦竟也來(lái)了秕磷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,062評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤炼团,失蹤者是張志新(化名)和其女友劉穎澎嚣,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瘟芝,經(jīng)...
    沈念sama閱讀 45,500評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡易桃,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評(píng)論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锌俱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晤郑。...
    茶點(diǎn)故事閱讀 39,834評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出造寝,到底是詐尸還是另有隱情磕洪,我是刑警寧澤,帶...
    沈念sama閱讀 35,559評(píng)論 5 345
  • 正文 年R本政府宣布匹舞,位于F島的核電站,受9級(jí)特大地震影響线脚,放射性物質(zhì)發(fā)生泄漏赐稽。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評(píng)論 3 328
  • 文/蒙蒙 一浑侥、第九天 我趴在偏房一處隱蔽的房頂上張望姊舵。 院中可真熱鬧,春花似錦寓落、人聲如沸括丁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)史飞。三九已至,卻和暖如春仰税,著一層夾襖步出監(jiān)牢的瞬間构资,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工陨簇, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留吐绵,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,958評(píng)論 2 370
  • 正文 我出身青樓河绽,卻偏偏與公主長(zhǎng)得像己单,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子耙饰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容