上一篇文章已經(jīng)已經(jīng)執(zhí)行到 Client 向 masterEndpoint 發(fā)送了 RequestSubmitDriver 信息印屁,下面就看看 master 怎么注冊 driver 信息,并且怎么讓 worker 去啟動 driver 的留量。
一,org.apache.spark.deploy.master.Master
這個 Master 就是前面 Client 發(fā)送的對象哟冬,是一個 ThreadSafeRpcEndpoint楼熄。內(nèi)部的 receiveAndReply
這個方法在監(jiān)聽外部發(fā)來信息。下面就來看這個方法浩峡。
1孝赫,receiveAndReply 方法
這個方法內(nèi)部會根據(jù)發(fā)送過來的消息做模式匹配,我們找到 Client 發(fā)送過來的 RequestSubmitDriver 這個消息對應(yīng)代碼红符,如下:
// 匹配到 Client 發(fā)送過來的消息
case RequestSubmitDriver(description) =>
// 判斷當(dāng)前 master 的狀態(tài)是否為 alive
if (state != RecoveryState.ALIVE) {
// 如果不是 alive 則回復(fù) driver 提交失敗
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
// 根據(jù) client 發(fā)過來的 driver 信息創(chuàng)建 driver青柄,然后持久化 driver
// 然后將 driver 加入到等待隊(duì)列中去
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
// 將 driver 加入到 HashSet 中去
drivers.add(driver)
// 開始調(diào)度
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
這段代碼伐债,做了這么一些操作:判斷當(dāng)前 master 的狀態(tài)是否為 alive ,如果不是則回復(fù)消息說:提交失敗致开,如果是則根據(jù)傳遞過來的 driver 信息創(chuàng)建 driver 對象(通過 createDriver 方法創(chuàng)建)并將其持久化峰锁,加入到等待隊(duì)列中去,然后開始執(zhí)行調(diào)度算法 schduler双戳。
這里涉及到連個方法虹蒋,分別可以看一下,一個是 createDriver 方法飒货,一個是 schduler 方法魄衅。
2,createDriver 方法
// 創(chuàng)建 driver 對象
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
// 通過系統(tǒng)當(dāng)前時間生成一個 driverId
// 然后將系統(tǒng)當(dāng)前時間塘辅,driverId晃虫,DriverDescription,日期 這些信息封裝成一個 DriverInfo
new DriverInfo(now, newDriverId(date), desc, date)
}
這個方法主要是通過當(dāng)前時間生成一個 driverId扣墩,然后將當(dāng)前時間哲银,DriverDescription 等參數(shù)封裝成一個 DriverInfo 對象。
3呻惕,schduler 方法
該方法在 master 中會被多次調(diào)用荆责,每當(dāng) driver 的等待隊(duì)列中數(shù)據(jù)發(fā)生變動或者集群資源發(fā)生變化都會掉用這個方法。這個方法主要是為當(dāng)前 driver 的等待隊(duì)列分配資源的亚脆。
private def schedule(): Unit = {
// 首先判斷當(dāng)前 master 的狀態(tài)是否為 alive 的做院,如果不是 alive 則不往下執(zhí)行
if (state != RecoveryState.ALIVE) {
return
}
// Random.shuffle 這個方法主要是隨機(jī)分散各個元素,具體代碼可以點(diǎn)進(jìn)去看
// 這里主要是將集群中 state 為 alive 的 worker 帥選出來濒持,然后隨機(jī)打亂
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
// 當(dāng)前 alive 的 worker 數(shù)量
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 將等待分配資源的 driver 隊(duì)列中的所有 driver 進(jìn)行遍歷
// 然后為每個 driver 遍歷一遍所有的 alive worker键耕,當(dāng)碰到 worker 的可用內(nèi)存和比當(dāng)前隊(duì)列中
// 等待的 driver 所需要的內(nèi)存要大并且 worker 的 core 數(shù)量也滿足 driver 的需求時
// 就會調(diào)用 launcherDriver 方法去將 driver 發(fā)送對應(yīng)的 worker 上去執(zhí)行
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// 找到符合條件的 worker
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
// 將該 driver 從等待隊(duì)列中移除
waitingDrivers -= driver
// 標(biāo)記當(dāng)前 driver 為 launched
launched = true
}
// 移到下一個 driver 上
curPos = (curPos + 1) % numWorkersAlive
}
}
// 調(diào)用 startExecutorsOnWorkers 方法
startExecutorsOnWorkers()
}
這個 schduler 方法會遍歷等待分配資源的 driver 隊(duì)列,為每個 driver 遍歷一遍 alive 的 worker弥喉,找到資源滿足的 worker郁竟,然后調(diào)用 launchDriver 方法玛迄,將該 driver 在這個 worker 上啟動由境,移除掉等待隊(duì)列中當(dāng)前 driver,然后調(diào)用 startExecutorsOnWorkers 啟動 executor蓖议。
這里又有兩個方法虏杰,一個是 launchDriver 方法,一個是 startExecutorsOnWorkers 方法去啟動 executor勒虾,startExecutorsOnWorkers 這個方法放到下面文章里講纺阔,這篇文章主要講 driver 注冊和啟動。
4修然,launchDriver 方法
這個方法主要是更新一些信息(worker 中的資源變更笛钝,worker 中啟動的 driver 信息記錄质况;driver 中添加上 worker 的信息),然后將向?qū)?yīng)的 worker 發(fā)送 LaunchDriver 的消息玻靡。
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 這里是將 workerInfo 中添加上啟動 driver 的信息结榄,內(nèi)部也會減去 driver 使用掉的資源
worker.addDriver(driver)
// 將 driver 啟動的 worker 信息記錄到 driver 中
driver.worker = Some(worker)
// 給 worker 發(fā)送 LaunchDriver 的信息
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
// 標(biāo)記當(dāng)前 driver 狀態(tài)為 running 狀態(tài)
driver.state = DriverState.RUNNING
}
通過把啟動的 driver 信息記錄到對應(yīng)的 worker 信息中,再將對應(yīng)的 worker 信息記錄到 driver 里囤捻,然后給 worker 發(fā)送消息讓 worker 啟動 driver臼朗,標(biāo)記當(dāng)前的 driver 狀態(tài)為 running。
這里會給 worker 發(fā)送 LaunchDriver 的消息蝎土,下面去看下 worker 中是怎么處理這個消息的视哑。
二,org.apache.spark.deploy.worker.Worker
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging
從繼承關(guān)系上可以看出 worker 也是 RpcEndPoint誊涯,所以直接找到它的 receive 方法挡毅,然后根據(jù)模式匹配找到 LaunchDriver 這個匹配下看操作邏輯即可。
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
// 將 driver 信息封裝到一個 runner 內(nèi)
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
// 然后將這個 runner 保存到一個 HashMap 中
drivers(driverId) = driver
// 啟動這個 runner
driver.start()
// 更新當(dāng)前 worker 的資源信息
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
這里會將 driver 的信息封裝到一個 DriverRunner 里面醋拧,然后再降這個 runner 保存到內(nèi)存的一個 HashMap 中慷嗜,然后開啟這個 ruuner,更新當(dāng)前 worker 的資源信息丹壕。
到這里我們需要去看 DriverRunner 里是怎么操作的庆械。
三,org.apache.spark.deploy.worker.DriverRunner
DriverRunner 是在 standalone cluster 部署模式下用來執(zhí)行 driver 操作的菌赖,包括當(dāng) driver 掛掉之后的自動重啟缭乘。
1,start 方法
前面調(diào)用的是 runner 的 start 方法琉用,所以我們直接看這個 start 方法:
private[worker] def start() = {
// 開一個線程
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}
// 準(zhǔn)備 driver 的 jar 包并且執(zhí)行 driver堕绩,并返回一個 exitCode
val exitCode = prepareAndRunDriver()
// 根據(jù) exitCode 設(shè)置 finalState,一共有三種邑时,分別為:FINISHED奴紧,KILLED,F(xiàn)AILED
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}
// 然后將 driverId 和 driver 執(zhí)行結(jié)果 finalState 以及一些異常信息發(fā)送給 worker
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
這里主要是調(diào)用了一個 prepareAndRunDriver 這個方法晶丘,返回了一個結(jié)果碼黍氮,然后把結(jié)果碼轉(zhuǎn)換為 finalState ,然后發(fā)送給 worker浅浮。
所以我們直接去找 prepareAndRunDriver 這個方法沫浆。
2,prepareAndRunDriver 方法
private[worker] def prepareAndRunDriver(): Int = {
// 創(chuàng)建 driver 的工作目錄
val driverDir = createWorkingDirectory()
// 下載 driver 的 jar 包到工作目錄下
val localJarFilename = downloadUserJar(driverDir)
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
// 創(chuàng)建 ProcessBuilder
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
runDriver(builder, driverDir, driverDesc.supervise)
}
這個方法主要做了這些事:創(chuàng)建 driver 的工作目錄滚秩,將 driver 的 jar 包下載到工作目錄下专执,然后創(chuàng)建 ProcessBuilder,傳入 driver 的執(zhí)行命令郁油,然后調(diào)用 runDriver 方法本股。
下面我們看下 runDriver 方法攀痊。
3,runDriver 方法
private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
builder.directory(baseDir)
// 初始化操作
def initialize(process: Process): Unit = {
// 創(chuàng)建 stout 文件
val stdout = new File(baseDir, "stdout")
// 將 process 的 InputStream 流重定向?yàn)?stout 文件
CommandUtils.redirectStream(process.getInputStream, stdout)
// 創(chuàng)建 stderr 文件
val stderr = new File(baseDir, "stderr")
// 將 builder 命令格式化處理
val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
// 將 process 的 ErrStream 重定向到 stderr 文件
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
// 調(diào)用 runCommandWithRetry
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}
該方法主要是定義了一個 initialize 方法拄显,里面會將傳入的 process 的輸入流和 err 流重定向到自定義的兩個文件中去蚕苇,然后調(diào)用 runCommandWithRetry 這個方法。
看下 runCommandWithRetry 這個方法凿叠。
4涩笤,runCommandWithRetry 方法
private[worker] def runCommandWithRetry(
command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
// 退出碼
var exitCode = -1
// 提交重試的燈帶時間
var waitSeconds = 1
// A run of this many seconds resets the exponential back-off.
val successfulRunDuration = 5
var keepTrying = !killed
while (keepTrying) {
logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
synchronized {
// 如果被 kill 則返回 exitcode
if (killed) { return exitCode }
// 執(zhí)行 command 命令,啟動 driver 進(jìn)程
process = Some(command.start())
// 調(diào)用上面定義好的 initialize 方法盒件,將一些流的輸出文件做重定向
initialize(process.get)
}
val processStart = clock.getTimeMillis()
exitCode = process.get.waitFor()
// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
sleeper.sleep(waitSeconds)
waitSeconds = waitSeconds * 2 // exponential back-off
}
}
exitCode
}
這里是真正運(yùn)行 driver 進(jìn)程的地方蹬碧,開啟 driver 進(jìn)程后會使用上面 runDriver 中定義好的 initialize 方法去將 driver 進(jìn)程中的一些流的輸出文件做重定向操作,并返回 exitcode炒刁。
至此恩沽,driver 就已經(jīng)在 master 上注冊好了,并且 master 也分配合適的 worker 啟動了該 driver 進(jìn)程翔始。
我們在 DriverRunner start 方法的最后會調(diào)用 worker.send(DriverStateChanged(driverId, finalState.get, finalException))
這個方法罗心,給 worker 發(fā)送 driver 狀態(tài)變化的消息。
四城瞎,org.apache.spark.deploy.worker.Worker
這里我們看下 worker 是怎么處理的渤闷。
在 woker 的 receive 方法的模式匹配中是這么操作的:
case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
handleDriverStateChanged(driverStateChanged)
會去調(diào)用 handleDriverStateChanged 這個方法。
1脖镀,handleDriverStateChanged 方法
我們再看下 handleDriverStateChanged 這個方法:
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
val state = driverStateChanged.state
// 根據(jù) state 做匹配打印日志
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
// 向 master 發(fā)送 driverStateChanged 消息
sendToMaster(driverStateChanged)
// 將該 driver 從 drivers 移除到 finishedDrivers 中去
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
trimFinishedDriversIfNecessary()
// 更新 worker 節(jié)點(diǎn)的資源情況
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}
主要是做了這些事:根據(jù)發(fā)送過來的 state 做模式匹配飒箭,打印對應(yīng)的 log。然后把這個 driverStateChanged 消息轉(zhuǎn)發(fā)給 master蜒灰,最后再更新下當(dāng)前 worker 的一些存儲數(shù)據(jù)弦蹂。
最后在看下 master 收到這個 driverStateChanged 消息是怎么處理的。
五强窖,org.apache.spark.deploy.master.Master
在其 recieve 方法中可以找到匹配到 driverStageChanged 消息后的操作:
case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
// 調(diào)用 removeDriver 方法
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
在這里是調(diào)用了 removeDriver 方法凸椿,我們下面就看下這個方法。
1翅溺,removeDriver 方法
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
// 根據(jù) driver id 進(jìn)行模式匹配
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 從 drivers 集合中移除當(dāng)前 driver
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 將 driver 添加到 completedDrivers 中去
completedDrivers += driver
// 從持久化引擎中移除
persistenceEngine.removeDriver(driver)
// 更新 driver 的狀態(tài)和 exception 并從 driver 的 worker 中移除掉當(dāng)前 driver
driver.state = finalState
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
這個方法主要是將 master 中資源做恢復(fù)操作脑漫,會根據(jù)當(dāng)前退出的 driver 做模式匹配,找到這個 driver未巫,然后將其從 drivers 的集合中移除窿撬,添加到 completedDrivers 中去启昧,然后從持久化引擎中移除掉叙凡,更新 driver 的狀態(tài),并從 driver 持有的 worker 中移除掉結(jié)束的這個 driver密末。然后再調(diào)用 schedule 方法握爷,讓釋放資源重新調(diào)度跛璧。
至此,driver 的注冊新啼,啟動追城,以及退出后資源回收,都結(jié)束了燥撞。