Spark-Core源碼精讀(2)枯途、Master中的schedule詳解

上一篇博客詳細分析了Spark在Standalone模式下的部署過程愉舔,文中提到在Worker注冊完成后需要執(zhí)行一個schedule操作來分配資源,本文就將具體分析此方法具體是怎樣分配資源的锈嫩。

注:本專題的文章皆使用Spark-1.6.3版本的源碼為參考,如果Spark-2.1.0版本有重大改進的地方也會進行說明垦搬。

什么時候會調(diào)用schedule呼寸?

其實每當一個新的application加入或者資源發(fā)生變化的時候都會調(diào)用schudule方法對資源進行重新分配,那么它是如何分配資源的呢猴贰?我們下面進行源碼級別的分析对雪。

schedule

我們先貼出schedule的源碼:

// 既然要分配資源就必須保證Master的當前狀態(tài)為ALIVE
if (state != RecoveryState.ALIVE) {
  return
}
// Drivers take strict precedence over executors
// 注釋說的很明確,先注冊Drivers然后再注冊executors
// 1. 首先將ALIVE狀態(tài)的Workers使用shuffle的方式打亂米绕,以免每次都將Driver分配到同一個Worker上
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 2. 循環(huán)遍歷啟動Drivers
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
  // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
  // start from the last worker that was assigned a driver, and continue onwards until we have
  // explored all alive workers.
  var launched = false
  var numWorkersVisited = 0
  // 2.1 判斷是否有剩余的沒有分配的Workers慌植,并且尚未啟動
  while (numWorkersVisited < numWorkersAlive && !launched) {
    // 2.2 獲取一個Worker,第一個的索引為0义郑,后面的索引根據(jù)curPos = (curPos + 1) % numWorkersAlive進行計算
    val worker = shuffledAliveWorkers(curPos)
    // 2.3 標記分配過的Worker加1
    numWorkersVisited += 1
    // 2.4 判斷當前的Worker的內(nèi)存和cpu是否滿足Driver的需求
    if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
      // 2.5 如果滿足資源的需求就在當前Worker上啟動Driver
      launchDriver(worker, driver)
      // 2.6 啟動完成后從等待的隊列中刪除,并將launched標記為true
      waitingDrivers -= driver
      launched = true
    }
    curPos = (curPos + 1) % numWorkersAlive
  }
}
// 3 啟動Executors
startExecutorsOnWorkers()

啟動Driver

我已經(jīng)在上面的源碼中對分配的流程進行了詳細的注釋丈钙,現(xiàn)在我們看一下launchDriver方法:

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  // 1. 打日志
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  // 2. 向worker中添加driver的信息非驮,包括增加已經(jīng)使用的內(nèi)存和cpu信息
  worker.addDriver(driver)
  // 3. 向driver中添加該worker的引用
  driver.worker = Some(worker)
  // 4. 向Worker發(fā)送LaunchDriver的消息,通知Worker啟動Driver
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  // 5. 將driver的狀態(tài)變成RUNNING
  driver.state = DriverState.RUNNING
}

接下來我們看一下對應(yīng)的Worker在接收到LaunchDriver消息后是怎么處理的:

case LaunchDriver(driverId, driverDesc) => {
  // 1. 打日志
  logInfo(s"Asked to launch driver $driverId")
  // 2. 實例化DriverRunner
  val driver = new DriverRunner(
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    self,
    workerUri,
    securityMgr)
  // 3. 實例化完成后向drivers中添加該driver的記錄
  drivers(driverId) = driver
  // 4. 啟動driver
  driver.start()

  // 5. 啟動完成后記錄資源的變化
  coresUsed += driverDesc.cores
  memoryUsed += driverDesc.mem
}

繼續(xù)跟蹤driver.start():

// 英文注釋說的很清楚:啟動一個線程來運行和管理driver
/** Starts a thread to run and manage the driver. */
private[worker] def start() = {
  new Thread("DriverRunner for " + driverId) {
    override def run() {
      try {
        // 創(chuàng)建driver的工作目錄
        val driverDir = createWorkingDirectory()
        // 下載用戶的Jar文件到driver的工作目錄并返回路徑名稱
        val localJarFilename = downloadUserJar(driverDir)
        def substituteVariables(argument: String): String = argument match {
          case "{{WORKER_URL}}" => workerUrl
          case "{{USER_JAR}}" => localJarFilename
          case other => other
        }
        // TODO: If we add ability to submit multiple jars they should also be added here
        val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
          driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
        // 具體的啟動Driver的操作雏赦,這里不再詳細分析
        launchDriver(builder, driverDir, driverDesc.supervise)
      }
      catch {
        case e: Exception => finalException = Some(e)
      }
      val state =
        if (killed) {
          DriverState.KILLED
        } else if (finalException.isDefined) {
          DriverState.ERROR
        } else {
          finalExitCode match {
            case Some(0) => DriverState.FINISHED
            case _ => DriverState.FAILED
          }
        }
      finalState = Some(state)
      worker.send(DriverStateChanged(driverId, state, finalException))
    }
  }.start()
}

如果啟動成功最后要向worker發(fā)送一條DriverStateChanged的消息劫笙,而Worker在接收到該消息后會調(diào)用handleDriverStateChanged方法進行一系列處理,具體的處理細節(jié)就不再說明星岗,主要的就是向Master發(fā)送一條driverStateChanged的消息填大,Master在接收到該消息后移除Driver的信息:

case DriverStateChanged(driverId, state, exception) => {
  state match {
    case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
      removeDriver(driverId, state, exception)
    case _ =>
      throw new Exception(s"Received unexpected state update for driver $driverId: $state")
  }
}

至此向Driver分配資源并啟動Driver的過程結(jié)束,下面我們看一下啟動Executors即執(zhí)行startExecutorsOnWorkers()的流程俏橘。

啟動Executors

startExecutorsOnWorkers():

/**
   * Schedule and launch executors on workers
   */
  private def startExecutorsOnWorkers(): Unit = {
     // 采用的是先進先出的原則
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for (app <- waitingApps if app.coresLeft > 0) {
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don't have enough resources to launch an executor
      // 過濾出ALIVE狀態(tài)并且資源滿足要求的workers允华,同時按照空閑cpu cores的個數(shù)倒序排列
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse
        
      // 決定在每個worker上面分配多少個cpu cores
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // 然后開始進行分配
      // Now that we've decided how many cores to allocate on each worker, let's allocate them
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

我們首先看一下是如何決定在每個worker上分配多少個cores的,這里我們只列出scheduleExecutorsOnWorkers方法的英文注釋寥掐,并進行說明靴寂,具體的操作大家可以去看源碼:

/**
 * Schedule executors to be launched on the workers.
 * Returns an array containing number of cores assigned to each worker.
 *
 * There are two modes of launching executors. The first attempts to spread out an application's
 * executors on as many workers as possible, while the second does the opposite (i.e. launch them
 * on as few workers as possible). The former is usually better for data locality purposes and is
 * the default.
 *
 * The number of cores assigned to each executor is configurable. When this is explicitly set,
 * multiple executors from the same application may be launched on the same worker if the worker
 * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
 * worker by default, in which case only one executor may be launched on each worker.
 *
 * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
 * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
 * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
 * allocated at a time, 12 cores from each worker would be assigned to each executor.
 * Since 12 < 16, no executors would launch [SPARK-8881].
 */

大致意思是說有兩種分配模型,第一種是將executors分配到盡可能多的workers上召耘;第二種與第一種相反百炬。默認使用的是第一種模型,這種模型更加符合數(shù)據(jù)的本地性原則污它,為每個Executor分配的cores的個數(shù)是可以進行配置的(spark-submit 或者 spark-env.sh)剖踊,如果設(shè)置了庶弃,多個executors可能會被分配在一個worker上(前提是該worker擁有足夠的cores和memory),否則每個executor會充分利用worker上的cores德澈,這種情況下一個executor會被分配在一個worker上歇攻。具體在集群上分配cores的時候會盡可能的滿足我們的要求,如果需要的cores的個數(shù)大于workers中空閑的cores的個數(shù)圃验,那么就先分配空閑的cores掉伏,盡可能的去滿足要求。

接下來就是具體為executors分配計算資源并啟動executors的過程:

private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      // 向application中添加executor的信息
      val exec = app.addExecutor(worker, coresToAssign)
      // 啟動executors
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

啟動executors:

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    // 向worker發(fā)消息啟動executor
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    // 然后向driver發(fā)送executors的信息
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

worker在接收到啟動executor的消息后執(zhí)行具體的啟動操作澳窑,并向Master匯報斧散。

然后也要向driver發(fā)送executors的資源信息,driver收到信息后執(zhí)行application摊聋,至此分配并啟動executors的大致流程也就執(zhí)行完畢鸡捐。

最后用一張圖總結(jié)一下啟動Driver和Worker的簡易流程:

本文只是大致的分析了Master在執(zhí)行schedule的時候具體為Driver、Executors分配資源并啟動它們的流程麻裁,以后我們還會分析整個application的運行流程箍镜,那時我們會具體進行分析。

本文參考和拓展閱讀:

Spark-1.6.3源碼

Spark-2.1.0源碼

本文為原創(chuàng)煎源,歡迎轉(zhuǎn)載色迂,轉(zhuǎn)載請注明出處、作者手销,謝謝歇僧!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市锋拖,隨后出現(xiàn)的幾起案子诈悍,更是在濱河造成了極大的恐慌,老刑警劉巖兽埃,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件侥钳,死亡現(xiàn)場離奇詭異,居然都是意外死亡柄错,警方通過查閱死者的電腦和手機舷夺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鄙陡,“玉大人冕房,你說我怎么就攤上這事〕梅” “怎么了耙册?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長毫捣。 經(jīng)常有香客問我详拙,道長帝际,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任饶辙,我火速辦了婚禮蹲诀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘弃揽。我一直安慰自己脯爪,他們只是感情好,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布矿微。 她就那樣靜靜地躺著痕慢,像睡著了一般。 火紅的嫁衣襯著肌膚如雪涌矢。 梳的紋絲不亂的頭發(fā)上掖举,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機與錄音娜庇,去河邊找鬼塔次。 笑死,一個胖子當著我的面吹牛名秀,可吹牛的內(nèi)容都是我干的励负。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼匕得,長吁一口氣:“原來是場噩夢啊……” “哼熄守!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起耗跛,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎攒发,沒想到半個月后调塌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡惠猿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年羔砾,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片偶妖。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡姜凄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出趾访,到底是詐尸還是另有隱情态秧,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布扼鞋,位于F島的核電站申鱼,受9級特大地震影響愤诱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜捐友,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一淫半、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧匣砖,春花似錦科吭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至变隔,卻和暖如春规伐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背匣缘。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工猖闪, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人肌厨。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓培慌,卻偏偏與公主長得像,于是被迫代替她去往敵國和親柑爸。 傳聞我的和親對象是個殘疾皇子吵护,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容