Spark中Master源碼分析(二)

繼續(xù)上一篇的內(nèi)容。上一篇的內(nèi)容為:

Spark中Master源碼分析(一) http://www.reibang.com/p/817a7069d058

4.receive方法跳纳,receive方法中消息類型主要分為以下12種情況:
(1)重新選擇了新Leader,進行數(shù)據(jù)的恢復(fù)
(2)恢復(fù)完畢瞭稼,重新創(chuàng)建Driver鸣峭,完成資源的重新分配
(3)觸發(fā)Leadership的選舉
(4)Master注冊新的Worker
(5)Master注冊新的App,然后重新分配資源
(6)Executor轉(zhuǎn)態(tài)發(fā)生改變籍胯,比如正在運行,執(zhí)行完畢后會發(fā)生的情況
(7)Driver轉(zhuǎn)態(tài)發(fā)生變化,進行相應(yīng)的操作
(8)心跳機制爽冕,通過該機制master和worker保持聯(lián)系
(9)master對于app的狀態(tài)的處理
(10)worker調(diào)度狀態(tài)改變響應(yīng)
(11)沒有注冊的app將認(rèn)為已經(jīng)完成了并移除
(12)通過worker是否超時,從而判斷worker是否dead

12種情況詳細(xì)代碼如下所示:
(1)重新選擇了新Leader披蕉,進行數(shù)據(jù)的恢復(fù)
<code>
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
//恢復(fù)數(shù)據(jù)中
beginRecovery(storedApps, storedDrivers, storedWorkers)
//守護單線程1s后發(fā)送一個完成恢復(fù)的請求颈畸,并異步等待響應(yīng)
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery)
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
}
</code>
(2)恢復(fù)完畢,重新創(chuàng)建Driver没讲,完成資源的重新分配
<code>
case CompleteRecovery => completeRecovery()詳見下①
</code>
①completeRecovery方法如下:
<code>
private def completeRecovery() {
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
//kill所有的不響應(yīng)的workers和apps
workers.filter(.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(
.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// 重新創(chuàng)建Driver
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)詳見下②
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
</code>
②relaunchDriver方法如下,將Driver的轉(zhuǎn)態(tài)為RELAUNCHING眯娱,添加到即將創(chuàng)建的Driver列表中,然后重新分配資源
<code>
private def relaunchDriver(driver: DriverInfo) {
driver.worker = None
driver.state = DriverState.RELAUNCHING
waitingDrivers += driver
//重新分配資源爬凑,詳見下③
schedule()
}
</code>
③schedule的方法如下徙缴,該方法主要為等待執(zhí)行的apps安排可用的資源,每當(dāng)一個新的app提交或可用資源(worker等)發(fā)生變化時調(diào)用
<code>
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// Drivers優(yōu)先于executors
// 通過Random.shuffle返回一個新的亂序排序的workers集合
val shuffledWorkers = Random.shuffle(workers)
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//根據(jù)worker和driver信息創(chuàng)建worker嘁信,詳見下④
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
//調(diào)用和創(chuàng)建workers上的executors
startExecutorsOnWorkers()
}
</code>
④ launchDriver方法如下于样,根據(jù)worker和driver信息創(chuàng)建worker
<code>
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
//將worker的資源分配給driver
worker.addDriver(driver)
driver.worker = Some(worker)
//worker將啟動driver
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
//將driver的狀態(tài)置位RUNNING
driver.state = DriverState.RUNNING
}
</code>
(3)觸發(fā)Leadership的選舉
<code>
case RevokedLeadership => {
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
}
</code>
(4)Master注冊新的Worker,然后重新分配資源
<code>
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
} else if (idToWorker.contains(id)) {
//通知worker注冊失效吱抚,并退出
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
//將新添加的worker信息持久化
persistenceEngine.addWorker(worker)
//worker發(fā)送RegisteredWorker消息百宇,并開始向master發(fā)送心跳
workerRef.send(RegisteredWorker(self, masterWebUiUrl))
//重新分配資源
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress)workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress))
}
}
}
</code>
(5)Master注冊新的App,然后重新分配資源
<code>
case RegisterApplication(description, driver) => {
if (state == RecoveryState.STANDBY) {
} else {
logInfo("Registering app " + description.name)
//根據(jù)appdescription和driver創(chuàng)建app
val app = createApplication(description, driver),詳見下①
//注冊app
registerApplication(app)秘豹,詳見下②
logInfo("Registered app " + description.name + " with ID " + app.id)
//將app持久化
persistenceEngine.addApplication(app)
//driver將給AppClient發(fā)送RegisteredApplication消息
driver.send(RegisteredApplication(app.id, self))
//重新分配資源
schedule()
}
}
</code>
①createApplication方法如下携御,根據(jù)appdescription和driver創(chuàng)建app
<code>
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
//用App的主構(gòu)造器創(chuàng)建一個App
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
}
</code>
②registerApplication方法如下:
<code>
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
//將app的源信息,比如狀態(tài)既绕、運行時間啄刹、核數(shù)注冊到metrics系統(tǒng)中
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
}
</code>

(6)Executor轉(zhuǎn)態(tài)發(fā)生改變,比如正在運行凄贩,執(zhí)行完畢后會發(fā)生的情況
<code>
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
exec.state = state
//如果executor正在執(zhí)行任務(wù)誓军,將retry次數(shù)置位0
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } //給appClient發(fā)送ExecutorUpdated消息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
//如果Executor執(zhí)行完了,移除worker和app上的executor
if (ExecutorState.isFinished(state)) {
logInfo(s"Removing executor ${exec.fullId} because it is $state")
//如果一個app已經(jīng)執(zhí)行完了疲扎,將它的信息反饋在Web UI上
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)
val normalExit = exitStatus == Some(0)
// 只要retry次數(shù)小于10昵时,那么executor的資源就會不斷的調(diào)整
if (!normalExit) {
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
//調(diào)整資源
schedule()
} else {
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
}
}
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}
</code>

(7)Driver轉(zhuǎn)態(tài)發(fā)生變化,進行相應(yīng)的操作
<code>
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}
</code>

(8)心跳機制,通過該機制master和worker保持聯(lián)系
<code>
case Heartbeat(workerId, worker) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
//更新worker的最后一次心跳時間
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
if (workers.map(_.id).contains(workerId)) {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" Asking it to re-register.")
worker.send(ReconnectWorker(masterUrl))
} else {
logWarning(s"Got heartbeat from unregistered worker $workerId." +
" This worker was never registered, so ignoring the heartbeat.")
}
}
}
</code>
(9)master對于app的狀態(tài)的處理
<code>
case MasterChangeAcknowledged(appId) => {
idToApp.get(appId) match {
case Some(app) =>
logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
}
if (canCompleteRecovery) { completeRecovery() }
}
</code>

(10)worker調(diào)度狀態(tài)改變響應(yīng)
<code>
case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
idToWorker.get(workerId) match {
case Some(worker) =>
logInfo("Worker has been re-registered: " + workerId)
worker.state = WorkerState.ALIVE
val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
for (exec <- validExecutors) {
val app = idToApp.get(exec.appId).get
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
worker.addExecutor(execInfo)
execInfo.copyState(exec)
}
for (driverId <- driverIds) {
drivers.find(_.id == driverId).foreach { driver =>
driver.worker = Some(worker)
driver.state = DriverState.RUNNING
worker.drivers(driverId) = driver
}
}
case None =>
logWarning("Scheduler state from unknown worker: " + workerId)
}
if (canCompleteRecovery) { completeRecovery() }
}
</code>
(11)沒有注冊的app將認(rèn)為已經(jīng)完成了并移除
<code>
case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
</code>
(12)通過worker是否超時椒丧,從而判斷worker是否dead
<code>
case CheckForWorkerTimeOut => {
//移除Dead worker壹甥,如果系統(tǒng)當(dāng)前時間-Worker超時(1min)>worker最后心跳時間,判斷worker為dead并移除
timeOutDeadWorkers()
}
</code>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末壶熏,一起剝皮案震驚了整個濱河市句柠,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖溯职,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件精盅,死亡現(xiàn)場離奇詭異,居然都是意外死亡谜酒,警方通過查閱死者的電腦和手機叹俏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來甚带,“玉大人她肯,你說我怎么就攤上這事∮ス螅” “怎么了晴氨?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長碉输。 經(jīng)常有香客問我籽前,道長,這世上最難降的妖魔是什么敷钾? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任枝哄,我火速辦了婚禮,結(jié)果婚禮上阻荒,老公的妹妹穿的比我還像新娘挠锥。我一直安慰自己,他們只是感情好侨赡,可當(dāng)我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布蓖租。 她就那樣靜靜地躺著,像睡著了一般羊壹。 火紅的嫁衣襯著肌膚如雪蓖宦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天油猫,我揣著相機與錄音稠茂,去河邊找鬼。 笑死情妖,一個胖子當(dāng)著我的面吹牛睬关,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播毡证,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼电爹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了情竹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎秦效,沒想到半個月后雏蛮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡阱州,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年挑秉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片苔货。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡犀概,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出夜惭,到底是詐尸還是另有隱情姻灶,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布诈茧,位于F島的核電站产喉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏敢会。R本人自食惡果不足惜曾沈,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鸥昏。 院中可真熱鬧塞俱,春花似錦、人聲如沸吏垮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惫皱。三九已至像樊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間旅敷,已是汗流浹背生棍。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留媳谁,地道東北人涂滴。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像晴音,于是被迫代替她去往敵國和親柔纵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容