TaskScheduler的初始化機(jī)制
TaskScheduler,如何注冊(cè)Application忧侧,executor如何反向注冊(cè)石窑?
TaskScheduler的初始化機(jī)制
- createTaskScheduler(),內(nèi)部會(huì)創(chuàng)建三個(gè)東西蚓炬。
- 一是TaskSchedulerImpl松逊,它其實(shí)就是我們所說(shuō)的TaskScheduler。
- 二是SparkDeploySchedulerBackend肯夏,它在底層會(huì)負(fù)責(zé)接收TaskSchedulerImpl的控制经宏,實(shí)際上負(fù)責(zé)與Master的注冊(cè),Ececutor的反注冊(cè)熄捍,task發(fā)送到executor等操作烛恤。
- 調(diào)用TaskSchedulerImpl的init()方法,創(chuàng)建SchedulerPool余耽,當(dāng)DAGScheduler要讓TaskScheduler去調(diào)度一些任務(wù)的時(shí)候缚柏,就會(huì)把這些任務(wù)放到調(diào)度池里面,它有不同的優(yōu)先策略碟贾,比如FIFO币喧。
- 調(diào)用TaskSchedulerImpl的start()方法,方法內(nèi)部調(diào)用SparkDeploySchedulerBackend的start()方法袱耽。
- SparkDeploySchedulerBackend的start()方法杀餐,創(chuàng)建一個(gè)東西,AppClient朱巨。
- AppClient史翘,啟動(dòng)一個(gè)線程,創(chuàng)建一個(gè)ClientActor冀续。
- ClientActor線程琼讽,調(diào)用兩個(gè)方法,registerWithMaster()——>tryRegisterAllMasters()洪唐。
- registerWithMaster()——>tryRegisterAllMasters()钻蹬,向MasterActor發(fā)送RegisterApplication(case class,里面封裝了Application的信息)凭需。
- RegisterApplication發(fā)送數(shù)據(jù)到Spark集群的Master——>Worker——>Executor问欠。
- Executor反向注冊(cè)到SparkDeploySchedulerBackend上面去肝匆。
TaskSchedulerImpl底層實(shí)際主要基于SparkDeploySchedulerBackend來(lái)工作。
DAGScheduler
DAGSchedulerEventProcessActor顺献,DAGScheduler底層基于該組件進(jìn)行通信旗国。(線程)
SprkUI
SprkUI,顯示Application運(yùn)行的狀態(tài)注整,啟動(dòng)一個(gè)jetty服務(wù)器粗仓,來(lái)提供web服務(wù),從而顯示網(wǎng)頁(yè)设捐。
源碼分析
package org.apache.spark借浊,SparkContext.scala
// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
// 這是我們常用的Sparkt提交模式中的standalone方式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
package org.apache.spark.scheduler,TaskSchedulerImpl.scala
/**
* 1萝招、底層通過(guò)操作一個(gè)SchedulerBackend蚂斤,針對(duì)不同種類的cluster(standlalone、yarn槐沼、mesos)曙蒸,調(diào)度task
* 2、它也可以通過(guò)使用一個(gè)LacalBackend岗钩,并且將isLocal參數(shù)設(shè)置為true纽窟,來(lái)在本地模式下工作
* 3、它負(fù)責(zé)處理一些通用的邏輯兼吓,比如說(shuō)決定多個(gè)job的調(diào)度順序臂港,啟動(dòng)推測(cè)任務(wù)執(zhí)行
* 4、客戶端首先應(yīng)該調(diào)用它的initialize()方法和start()方法视搏,然后通過(guò)runTasks()方法提交task sets
*/
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
start()方法审孽,
// start()方法,sparkContext.scala
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
taskScheduler.start()
// TaskSchedulerImpl.scala
override def start() {
backend.start()
// SparkDeploySchedulerBackend.scala
override def start() {
super.start()
// 這個(gè)ApplicationDescription浑娜,非常重要
// 它就代表了當(dāng)前執(zhí)行的這個(gè)application的一些情況
// 包括application最大需要多少cpu core佑力,每個(gè)slave上需要多少內(nèi)存
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
// 創(chuàng)建了AppClient
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
package org.apache.spark.deploy.client,AppClient.scala筋遭,
/**
* 這是一個(gè)接口
* 它負(fù)責(zé)為application與Spark集群進(jìn)行通信
* 它會(huì)接收一個(gè)spark master的url打颤,以及一個(gè)ApplicationDescripition,和一個(gè)集群事件的監(jiān)聽(tīng)器漓滔,以及各種事件發(fā)生時(shí)编饺,
* 監(jiān)聽(tīng)器的回調(diào)函數(shù)
*/
def start() {
// Just launch an actor; it will call back into the listener.
actor = actorSystem.actorOf(Props(new ClientActor))
}
package org.apache.spark.scheduler,DAGScheduler
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
} catch {
case e: Exception => {
try {
stop()
} finally {
throw new SparkException("Error while constructing DAGScheduler", e)
}
}
}
/**
* 實(shí)現(xiàn)了面向stage的調(diào)度機(jī)制的高層次的調(diào)度層次和。它會(huì)為每個(gè)job計(jì)算一個(gè)stage的DAG(有向無(wú)環(huán)圖)反肋,
* 追蹤RDD和stage的輸出是否被物化了(物化就是寫入了磁盤或內(nèi)存等地方)那伐,并且尋找一個(gè)最少消耗(最優(yōu)踏施、最惺帷)調(diào)度機(jī)制來(lái)運(yùn)行job。
* 它會(huì)將stage作為tasksets提交到底層的TaskSchedulerImpl上畅形,來(lái)在集群上運(yùn)行它們(task)养距。
*
* 除了stage的DAG,它還負(fù)責(zé)決定運(yùn)行每個(gè)task的最佳位置日熬,基于當(dāng)前的緩存狀態(tài)棍厌,將這些最佳位置提交給底層的TaskSchedulerImpl。
* 此外竖席,它還會(huì)處理由于shuffle輸出文件丟失導(dǎo)致的失敗耘纱,在這種情況下,舊的stage可能會(huì)被重新提交毕荐。
* 一個(gè)stage內(nèi)部的失敗束析,如果不是由于shuffle文件丟失所導(dǎo)致的,會(huì)被TaskScheduler處憎亚,它會(huì)多次重試每一個(gè)task员寇,直到最后,實(shí)在不行了第美,
* 才會(huì)去取消整個(gè)stage蝶锋。
*/
the Spark UI,
// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
env.securityManager,appName))
} else {
// For tests, do not enable the UI
None
}
本文首發(fā)于steem什往,感謝閱讀扳缕,轉(zhuǎn)載請(qǐng)注明。
微信公眾號(hào)「padluo」别威,分享數(shù)據(jù)科學(xué)家的自我修養(yǎng)第献,既然遇見(jiàn),不如一起成長(zhǎng)兔港。
數(shù)據(jù)分析
讀者交流電報(bào)群
知識(shí)星球交流群
知識(shí)星球讀者交流群