任務提交流程
概述
在闡明了Spark的Master的啟動流程與Worker啟動流程芹橡。接下繼續(xù)執(zhí)行的就是Worker上的Executor進程了韩玩,本文繼續(xù)分析整個Executor的啟動與任務提交流程
Spark-submit
提交一個任務到集群通過的是Spark-submit
通過啟動腳本的方式啟動它的主類佑吝,這里以WordCount為例子
`spark-submit --class cn.itcast.spark.WordCount``
bin/spark-clas -> org.apache.spark.deploy.SparkSubmit 調(diào)用這個類的main方法
doRunMain方法中傳進來一個自定義spark應用程序的main方法
class cn.kinge.spark.WordCount
通過反射拿到類的實例的引用
mainClass = Utils.classForName(childMainClass)
在通過反射調(diào)用
class cn.kinge.spark.WordCount
的main
方法
我們來看SparkSubmit的main方法
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
//匹配任務類型
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
這里的類型是submit骨饿,調(diào)用submit方法
private[spark] def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
围段。顾翼。。奈泪。适贸。。
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
//childMainClass這個你自己定義的App的main所在的全類名
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
涝桅。拜姿。。冯遂。蕊肥。。
}
}
蛤肌。壁却。批狱。。儒洛。精耐。。
//掉用上面的doRunMain
doRunMain()
}
submit里調(diào)用了doRunMain()琅锻,然后調(diào)用了runMain卦停,來看runMain
private def runMain(
。恼蓬。惊完。。处硬。小槐。
try {
//通過反射
mainClass = Class.forName(childMainClass, true, loader)
} catch {
。荷辕。凿跳。。疮方。控嗜。
}
//反射拿到面方法實例
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
。骡显。疆栏。。惫谤。壁顶。
try {
//調(diào)用App的main方法
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
throw findCause(t)
}
}
最主要的流程就在這里了,上面的代碼注釋很清楚溜歪,通過反射調(diào)用我們寫的類的main方法若专,大體的流程到此
SparkSubmit時序圖
Executor啟動流程
SparkSubmit通過反射調(diào)用了我們程序的main方法后,就開始執(zhí)行我們的代碼
蝴猪,一個Spark程序中需要創(chuàng)建SparkContext對象调衰,我們就從這個對象開始
SparkContext的構(gòu)造方法代碼很長,主要關(guān)注的地方如下
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
拯腮。窖式。。动壤。萝喘。。
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
//通過SparkEnv來創(chuàng)建createDriverEnv
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}
//在這里調(diào)用了createSparkEnv,返回一個SparkEnv對象阁簸,這個對象里面有很多重要屬性爬早,最重要的ActorSystem
private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)
//創(chuàng)建taskScheduler
// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
//創(chuàng)建DAGScheduler
dagScheduler = new DAGScheduler(this)
//啟動TaksScheduler
taskScheduler.start()
。启妹。筛严。。饶米。
}
Spark的構(gòu)造方法主要干三件事桨啃,創(chuàng)建了一個SparkEnv,taskScheduler檬输,dagScheduler照瘾,我們先來看createTaskScheduler
里干了什么
//通過給定的URL創(chuàng)建TaskScheduler
private def createTaskScheduler(
.....
//匹配URL選擇不同的方式
master match {
。丧慈。析命。。逃默。鹃愤。
//這個是Spark的Standalone模式
case SPARK_REGEX(sparkUrl) =>
//首先創(chuàng)建TaskScheduler
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
//很重要
val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
//初始化了一個調(diào)度器,默認是FIFO
scheduler.initialize(backend)
(backend, scheduler)
完域。软吐。。筒主。关噪。
}
}
通過master的url來匹配到Standalone模式:然后初始化了SparkDeploySchedulerBackend和TaskSchedulerImpl鸟蟹,這兩個對象很重要乌妙,是啟動任務調(diào)度的核心,然后調(diào)用了scheduler.initialize(backend)
進行初始化
啟動TaksScheduler初始化完成建钥,回到我們的SparkContext構(gòu)造方法后面繼續(xù)調(diào)用了
taskScheduler.start()
啟動TaksScheduler
來看start方法
override def start() {
//調(diào)用backend的實現(xiàn)的start方法
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}
這里的backend是SparkDeploySchedulerBackend調(diào)用了它的start
override def start() {
//CoarseGrainedSchedulerBackend的start方法藤韵,在這個方法里面創(chuàng)建了一個DriverActor
super.start()
// The endpoint for executors to talk to us
//下面是為了啟動java子進程做準備,準備一下參數(shù)
val driverUrl = AkkaUtils.address(
AkkaUtils.protocol(actorSystem),
SparkEnv.driverActorSystemName,
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// When testing, expose the parent class path to the child. This is processed by
// compute-classpath.{cmd,sh} and makes all needed jars available to child processes
// when the assembly is built with the "*-provided" profiles enabled.
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
//用command拼接參數(shù)熊经,最終會啟動org.apache.spark.executor.CoarseGrainedExecutorBackend子進程
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
//用ApplicationDescription封裝了一些重要的參數(shù)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec)
//在這里面創(chuàng)建ClientActor
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
//啟動ClientActor
client.start()
waitForRegistration()
}
這里是拼裝了啟動Executor的一些參數(shù)泽艘,類名+參數(shù) 封裝成ApplicationDescription。最后傳給并創(chuàng)建AppClient并調(diào)用它的start方法
AppClient創(chuàng)建時序圖
AppClient的start方法
接來下關(guān)注start方法
def start() {
// Just launch an actor; it will call back into the listener.
actor = actorSystem.actorOf(Props(new ClientActor))
}
在start方法里創(chuàng)建了與Master通信的ClientActor,然后會調(diào)用它的preStart方法向Master注冊,接下來看它的preStart
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
//ClientActor向Master注冊
registerWithMaster()
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
最后會調(diào)用該方法向所有Master注冊
def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
//t通過actorSelection拿到了Master的引用
val actor = context.actorSelection(masterAkkaUrl)
//向Master發(fā)送異步的注冊App的消息
actor ! RegisterApplication(appDescription)
}
}
ClientActor發(fā)送來的注冊App的消息镐依,ApplicationDescription匹涮,他包含了需求的資源,要求啟動的Executor類名和一些參數(shù)
Master的Receiver
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
//創(chuàng)建App sender:ClientActor
val app = createApplication(description, sender)
//注冊App
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//持久化App
persistenceEngine.addApplication(app)
//向ClientActor反饋信息槐壳,告訴他app注冊成功了
sender ! RegisteredApplication(app.id, masterUrl)
//TODO 調(diào)度任務
schedule()
}
}
registerApplication(app)
def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.path.address
if (addressToApp.contains(appAddress)) {
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
//把App放到集合里面
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
}
Master將接受的信息保存到集合并序列化后發(fā)送一個RegisteredApplication
消息通知反饋給ClientActor然低,接著執(zhí)行schedule()方法,該方法中會遍歷workers集合,并執(zhí)行l(wèi)aunchExecutor
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
//記錄該worker上使用了多少資源
worker.addExecutor(exec)
//Master向Worker發(fā)送啟動Executor的消息
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
//Master向ClientActor發(fā)送消息,告訴ClientActor executor已經(jīng)啟動了
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
這里Master向Worker發(fā)送啟動Executor的消息
worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
application.desc里包含了Executor類的啟動信息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
雳攘。带兜。。吨灭。刚照。
appDirectories(appId) = appLocalDirs
//創(chuàng)建一個ExecutorRunner,這個很重要喧兄,保存了Executor的執(zhí)行配置和參數(shù)
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl,
conf,
appLocalDirs, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
//TODO 開始啟動ExecutorRunner
manager.start()
无畔。。吠冤。檩互。。咨演。
}
}
}
Worker的Receiver接受到了啟動Executor的消息闸昨,appDesc對象保存了Command命令、Executor的實現(xiàn)類和參數(shù)
manager.start()
里會創(chuàng)建一個線程
def start() {
//啟動一個線程
workerThread = new Thread("ExecutorRunner for " + fullId) {
//用一個子線程來幫助Worker啟動Executor子進程
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
在線程中調(diào)用了fetchAndRunExecutor()
方法薄风,我們來看該方法
def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
//構(gòu)建命令
val command = builder.command()
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
//啟動子進程
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
//開始執(zhí)行饵较,等待結(jié)束信號
val exitCode = process.waitFor()
。遭赂。循诉。。
}
}
這里面進行了類名和參數(shù)的拼裝撇他,具體拼裝過程不用關(guān)心茄猫,最終builder.start()
會以SystemRuntime的方式啟動一個子進程,這個是進程的類名是CoarseGrainedExecutorBackend
到此Executor進程就啟動起來了
Executor創(chuàng)建時序圖
Executor任務調(diào)度對象啟動
Executor進程后困肩,就首先要執(zhí)行main方法划纽,main的代碼如下
//Executor進程啟動的入口
def main(args: Array[String]) {
。锌畸。勇劣。。
//拼裝參數(shù)
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
printUsageAndExit()
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
//開始執(zhí)行Executor
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}
執(zhí)行了run方法
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL])
潭枣。比默。。盆犁。命咐。
//通過actorSystem創(chuàng)建CoarseGrainedExecutorBackend -> Actor
//CoarseGrainedExecutorBackend -> DriverActor通信
env.actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend],
driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
name = "Executor")
。谐岁。醋奠。瓮下。。钝域。
}
env.actorSystem.awaitTermination()
}
}
run方法中創(chuàng)建了CoarseGrainedExecutorBackend的Actor對象用于準備和DriverActor通信讽坏,接著會繼續(xù)調(diào)用preStart生命周期方法
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
//Executor跟DriverActor建立連接
driver = context.actorSelection(driverUrl)
//Executor向DriverActor發(fā)送消息
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
Executor向DriverActor發(fā)送注冊的消息
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
DriverActor的receiver收到消息后
def receiveWithLogging = {
//Executor發(fā)送給DriverActor的注冊消息
case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
//DriverActor向Executor發(fā)送注冊成功的消息
sender ! RegisteredExecutor
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
//將Executor的信息封裝起來
val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
//往集合添加Executor的信息對象
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
//將來用來執(zhí)行真正的業(yè)務邏輯
makeOffers()
}
DriverActor的receiver里將Executor信息封裝到Map中保存起來,并發(fā)送反饋消息 sender ! RegisteredExecutor
給CoarseGrainedExecutorBackend
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
CoarseGrainedExecutorBackend收到消息后創(chuàng)建一個Executor對象用于準備任務的執(zhí)行例证,到此Executor 就已經(jīng)成功啟動了路呜,接下來就是等待任務的調(diào)度與執(zhí)行。